1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
15 from apiclient import errors as apiclient_errors
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
23 _logger = logging.getLogger('arvados.arvados_fuse')
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile('[\x00/]')
32 class Directory(FreshBase):
33 """Generic directory object, backed by a dict.
35 Consists of a set of entries with the key representing the filename
36 and the value referencing a File or Directory object.
39 def __init__(self, parent_inode, inodes, apiconfig, enable_write):
40 """parent_inode is the integer inode number"""
42 super(Directory, self).__init__()
45 if not isinstance(parent_inode, int):
46 raise Exception("parent_inode should be an int")
47 self.parent_inode = parent_inode
49 self.apiconfig = apiconfig
51 self._mtime = time.time()
52 self._enable_write = enable_write
54 def forward_slash_subst(self):
55 if not hasattr(self, '_fsns'):
57 config = self.apiconfig()
59 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
61 # old API server with no FSNS config
64 if self._fsns == '' or self._fsns == '/':
68 def unsanitize_filename(self, incoming):
69 """Replace ForwardSlashNameSubstitution value with /"""
70 fsns = self.forward_slash_subst()
71 if isinstance(fsns, str):
72 return incoming.replace(fsns, '/')
76 def sanitize_filename(self, dirty):
77 """Replace disallowed filename characters according to
78 ForwardSlashNameSubstitution in self.api_config."""
79 # '.' and '..' are not reachable if API server is newer than #6277
89 fsns = self.forward_slash_subst()
90 if isinstance(fsns, str):
91 dirty = dirty.replace('/', fsns)
92 return _disallowed_filename_characters.sub('_', dirty)
95 # Overridden by subclasses to implement logic to update the
96 # entries dict when the directory is stale
101 # Only used when computing the size of the disk footprint of the directory
109 def checkupdate(self):
113 except apiclient.errors.HttpError as e:
118 def __getitem__(self, item):
119 return self._entries[item]
124 return list(self._entries.items())
128 def __contains__(self, k):
129 return k in self._entries
134 return len(self._entries)
137 self.inodes.touch(self)
138 super(Directory, self).fresh()
140 def merge(self, items, fn, same, new_entry):
141 """Helper method for updating the contents of the directory.
143 Takes a list describing the new contents of the directory, reuse
144 entries that are the same in both the old and new lists, create new
145 entries, and delete old entries missing from the new list.
147 :items: iterable with new directory contents
149 :fn: function to take an entry in 'items' and return the desired file or
150 directory name, or None if this entry should be skipped
152 :same: function to compare an existing entry (a File or Directory
153 object) with an entry in the items list to determine whether to keep
156 :new_entry: function to create a new directory entry (File or Directory
157 object) from an entry in the items list.
161 oldentries = self._entries
165 name = self.sanitize_filename(fn(i))
167 if name in oldentries and same(oldentries[name], i):
168 # move existing directory entry over
169 self._entries[name] = oldentries[name]
172 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
173 # create new directory entry
176 self._entries[name] = self.inodes.add_entry(ent)
179 # delete any other directory entries that were not in found in 'items'
181 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
182 self.inodes.invalidate_entry(self, i)
183 self.inodes.del_entry(oldentries[i])
187 self.inodes.invalidate_inode(self)
188 self._mtime = time.time()
193 if super(Directory, self).in_use():
195 for v in self._entries.values():
200 def has_ref(self, only_children):
201 if super(Directory, self).has_ref(only_children):
203 for v in self._entries.values():
209 """Delete all entries"""
210 oldentries = self._entries
213 oldentries[n].clear()
214 self.inodes.del_entry(oldentries[n])
217 def kernel_invalidate(self):
218 # Invalidating the dentry on the parent implies invalidating all paths
220 parent = self.inodes[self.parent_inode]
222 # Find self on the parent in order to invalidate this path.
223 # Calling the public items() method might trigger a refresh,
224 # which we definitely don't want, so read the internal dict directly.
225 for k,v in parent._entries.items():
227 self.inodes.invalidate_entry(parent, k)
239 def want_event_subscribe(self):
240 raise NotImplementedError()
242 def create(self, name):
243 raise NotImplementedError()
245 def mkdir(self, name):
246 raise NotImplementedError()
248 def unlink(self, name):
249 raise NotImplementedError()
251 def rmdir(self, name):
252 raise NotImplementedError()
254 def rename(self, name_old, name_new, src):
255 raise NotImplementedError()
258 class CollectionDirectoryBase(Directory):
259 """Represent an Arvados Collection as a directory.
261 This class is used for Subcollections, and is also the base class for
262 CollectionDirectory, which implements collection loading/saving on
265 Most operations act only the underlying Arvados `Collection` object. The
266 `Collection` object signals via a notify callback to
267 `CollectionDirectoryBase.on_event` that an item was added, removed or
268 modified. FUSE inodes and directory entries are created, deleted or
269 invalidated in response to these events.
273 def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection, collection_root):
274 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write)
275 self.apiconfig = apiconfig
276 self.collection = collection
277 self.collection_root = collection_root
278 self.collection_record_file = None
280 def new_entry(self, name, item, mtime):
281 name = self.sanitize_filename(name)
282 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
283 if item.fuse_entry.dead is not True:
284 raise Exception("Can only reparent dead inode entry")
285 if item.fuse_entry.inode is None:
286 raise Exception("Reparented entry must still have valid inode")
287 item.fuse_entry.dead = False
288 self._entries[name] = item.fuse_entry
289 elif isinstance(item, arvados.collection.RichCollectionBase):
290 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item, self.collection_root))
291 self._entries[name].populate(mtime)
293 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
294 item.fuse_entry = self._entries[name]
296 def on_event(self, event, collection, name, item):
297 # These are events from the Collection object (ADD/DEL/MOD)
298 # emitted by operations on the Collection object (like
299 # "mkdirs" or "remove"), and by "update", which we need to
300 # synchronize with our FUSE objects that are assigned inodes.
301 if collection == self.collection:
302 name = self.sanitize_filename(name)
305 # It's possible for another thread to have llfuse.lock and
306 # be waiting on collection.lock. Meanwhile, we released
307 # llfuse.lock earlier in the stack, but are still holding
308 # on to the collection lock, and now we need to re-acquire
309 # llfuse.lock. If we don't release the collection lock,
310 # we'll deadlock where we're holding the collection lock
311 # waiting for llfuse.lock and the other thread is holding
312 # llfuse.lock and waiting for the collection lock.
314 # The correct locking order here is to take llfuse.lock
315 # first, then the collection lock.
317 # Since collection.lock is an RLock, it might be locked
318 # multiple times, so we need to release it multiple times,
319 # keep a count, then re-lock it the correct number of
325 self.collection.lock.release()
332 with self.collection.lock:
333 if event == arvados.collection.ADD:
334 self.new_entry(name, item, self.mtime())
335 elif event == arvados.collection.DEL:
336 ent = self._entries[name]
337 del self._entries[name]
338 self.inodes.invalidate_entry(self, name)
339 self.inodes.del_entry(ent)
340 elif event == arvados.collection.MOD:
341 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
342 self.inodes.invalidate_inode(item.fuse_entry)
343 elif name in self._entries:
344 self.inodes.invalidate_inode(self._entries[name])
346 if self.collection_record_file is not None:
347 self.collection_record_file.invalidate()
348 self.inodes.invalidate_inode(self.collection_record_file)
351 self.collection.lock.acquire()
354 def populate(self, mtime):
356 with self.collection.lock:
357 self.collection.subscribe(self.on_event)
358 for entry, item in self.collection.items():
359 self.new_entry(entry, item, self.mtime())
362 return self._enable_write and self.collection.writable()
366 self.collection_root.flush()
370 def create(self, name):
371 if not self.writable():
372 raise llfuse.FUSEError(errno.EROFS)
373 with llfuse.lock_released:
374 self.collection.open(name, "w").close()
378 def mkdir(self, name):
379 if not self.writable():
380 raise llfuse.FUSEError(errno.EROFS)
381 with llfuse.lock_released:
382 self.collection.mkdirs(name)
386 def unlink(self, name):
387 if not self.writable():
388 raise llfuse.FUSEError(errno.EROFS)
389 with llfuse.lock_released:
390 self.collection.remove(name)
395 def rmdir(self, name):
396 if not self.writable():
397 raise llfuse.FUSEError(errno.EROFS)
398 with llfuse.lock_released:
399 self.collection.remove(name)
404 def rename(self, name_old, name_new, src):
405 if not self.writable():
406 raise llfuse.FUSEError(errno.EROFS)
408 if not isinstance(src, CollectionDirectoryBase):
409 raise llfuse.FUSEError(errno.EPERM)
414 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
416 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
418 raise llfuse.FUSEError(errno.ENOTEMPTY)
419 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
420 raise llfuse.FUSEError(errno.ENOTDIR)
421 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
422 raise llfuse.FUSEError(errno.EISDIR)
424 with llfuse.lock_released:
425 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
430 super(CollectionDirectoryBase, self).clear()
431 self.collection = None
434 class CollectionDirectory(CollectionDirectoryBase):
435 """Represents the root of a directory tree representing a collection."""
437 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, collection_record=None, explicit_collection=None):
438 super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None, self)
440 self.num_retries = num_retries
443 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
445 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
446 self._poll_time = 60*60
448 if isinstance(collection_record, dict):
449 self.collection_locator = collection_record['uuid']
450 self._mtime = convertTime(collection_record.get('modified_at'))
452 self.collection_locator = collection_record
454 self._manifest_size = 0
455 if self.collection_locator:
456 self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
457 self._updating_lock = threading.Lock()
460 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
463 return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
467 if not self.writable():
469 with llfuse.lock_released:
470 with self._updating_lock:
471 if self.collection.committed():
472 self.collection.update()
474 self.collection.save()
475 self.new_collection_record(self.collection.api_response())
477 def want_event_subscribe(self):
478 return (uuid_pattern.match(self.collection_locator) is not None)
480 def new_collection(self, new_collection_record, coll_reader):
483 self.collection = coll_reader
484 self.new_collection_record(new_collection_record)
485 self.populate(self.mtime())
487 def new_collection_record(self, new_collection_record):
488 if not new_collection_record:
489 raise Exception("invalid new_collection_record")
490 self._mtime = convertTime(new_collection_record.get('modified_at'))
491 self._manifest_size = len(new_collection_record["manifest_text"])
492 self.collection_locator = new_collection_record["uuid"]
493 if self.collection_record_file is not None:
494 self.collection_record_file.invalidate()
495 self.inodes.invalidate_inode(self.collection_record_file)
496 _logger.debug("%s invalidated collection record file", self)
500 return self.collection_locator
505 if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
506 # It's immutable, nothing to update
509 if self.collection_locator is None:
510 # No collection locator to retrieve from
514 new_collection_record = None
516 with llfuse.lock_released:
517 self._updating_lock.acquire()
521 _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
523 if self.collection is not None:
524 # Already have a collection object
525 self.collection.update()
526 new_collection_record = self.collection.api_response()
528 get_threads = max(self.api.keep.block_cache.cache_max // (64 * 1024 * 1024)) - 1, 0)
529 # Create a new collection object
530 if uuid_pattern.match(self.collection_locator):
531 coll_reader = arvados.collection.Collection(
532 self.collection_locator, self.api, self.api.keep,
533 num_retries=self.num_retries,
534 get_threads=get_threads)
536 coll_reader = arvados.collection.CollectionReader(
537 self.collection_locator, self.api, self.api.keep,
538 num_retries=self.num_retries,
539 get_threads=get_threads)
541 new_collection_record = coll_reader.api_response() or {}
542 # If the Collection only exists in Keep, there will be no API
543 # response. Fill in the fields we need.
544 if 'uuid' not in new_collection_record:
545 new_collection_record['uuid'] = self.collection_locator
546 if "portable_data_hash" not in new_collection_record:
547 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
548 if 'manifest_text' not in new_collection_record:
549 new_collection_record['manifest_text'] = coll_reader.manifest_text()
550 if 'storage_classes_desired' not in new_collection_record:
551 new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
553 # end with llfuse.lock_released, re-acquire lock
555 if new_collection_record is not None:
556 if coll_reader is not None:
557 self.new_collection(new_collection_record, coll_reader)
559 self.new_collection_record(new_collection_record)
563 self._updating_lock.release()
564 except arvados.errors.NotFoundError as e:
565 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
566 except arvados.errors.ArgumentError as detail:
567 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
568 if new_collection_record is not None and "manifest_text" in new_collection_record:
569 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
571 _logger.exception("arv-mount %s: error", self.collection_locator)
572 if new_collection_record is not None and "manifest_text" in new_collection_record:
573 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
578 def collection_record(self):
580 return self.collection.api_response()
584 def __getitem__(self, item):
585 if item == '.arvados#collection':
586 if self.collection_record_file is None:
587 self.collection_record_file = FuncToJSONFile(
588 self.inode, self.collection_record)
589 self.inodes.add_entry(self.collection_record_file)
590 self.invalidate() # use lookup as a signal to force update
591 return self.collection_record_file
593 return super(CollectionDirectory, self).__getitem__(item)
595 def __contains__(self, k):
596 if k == '.arvados#collection':
599 return super(CollectionDirectory, self).__contains__(k)
601 def invalidate(self):
602 if self.collection_record_file is not None:
603 self.collection_record_file.invalidate()
604 self.inodes.invalidate_inode(self.collection_record_file)
605 super(CollectionDirectory, self).invalidate()
608 return (self.collection_locator is not None)
611 # This is an empirically-derived heuristic to estimate the memory used
612 # to store this collection's metadata. Calculating the memory
613 # footprint directly would be more accurate, but also more complicated.
614 return self._manifest_size * 128
617 if self.collection is not None:
619 self.collection.save()
620 self.collection.stop_threads()
623 if self.collection is not None:
624 self.collection.stop_threads()
625 super(CollectionDirectory, self).clear()
626 self._manifest_size = 0
629 class TmpCollectionDirectory(CollectionDirectoryBase):
630 """A directory backed by an Arvados collection that never gets saved.
632 This supports using Keep as scratch space. A userspace program can
633 read the .arvados#collection file to get a current manifest in
634 order to save a snapshot of the scratch data or use it as a crunch
638 class UnsaveableCollection(arvados.collection.Collection):
644 def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, storage_classes=None):
645 collection = self.UnsaveableCollection(
646 api_client=api_client,
647 keep_client=api_client.keep,
648 num_retries=num_retries,
649 storage_classes_desired=storage_classes)
650 # This is always enable_write=True because it never tries to
651 # save to the backend
652 super(TmpCollectionDirectory, self).__init__(
653 parent_inode, inodes, api_client.config, True, collection, self)
654 self.populate(self.mtime())
656 def on_event(self, *args, **kwargs):
657 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
658 if self.collection_record_file is None:
661 # See discussion in CollectionDirectoryBase.on_event
665 self.collection.lock.release()
672 with self.collection.lock:
673 self.collection_record_file.invalidate()
674 self.inodes.invalidate_inode(self.collection_record_file)
675 _logger.debug("%s invalidated collection record", self)
678 self.collection.lock.acquire()
681 def collection_record(self):
682 with llfuse.lock_released:
685 "manifest_text": self.collection.manifest_text(),
686 "portable_data_hash": self.collection.portable_data_hash(),
687 "storage_classes_desired": self.collection.storage_classes_desired(),
690 def __contains__(self, k):
691 return (k == '.arvados#collection' or
692 super(TmpCollectionDirectory, self).__contains__(k))
695 def __getitem__(self, item):
696 if item == '.arvados#collection':
697 if self.collection_record_file is None:
698 self.collection_record_file = FuncToJSONFile(
699 self.inode, self.collection_record)
700 self.inodes.add_entry(self.collection_record_file)
701 return self.collection_record_file
702 return super(TmpCollectionDirectory, self).__getitem__(item)
713 def want_event_subscribe(self):
717 self.collection.stop_threads()
719 def invalidate(self):
720 if self.collection_record_file:
721 self.collection_record_file.invalidate()
722 super(TmpCollectionDirectory, self).invalidate()
725 class MagicDirectory(Directory):
726 """A special directory that logically contains the set of all extant keep locators.
728 When a file is referenced by lookup(), it is tested to see if it is a valid
729 keep locator to a manifest, and if so, loads the manifest contents as a
730 subdirectory of this directory with the locator as the directory name.
731 Since querying a list of all extant keep locators is impractical, only
732 collections that have already been accessed are visible to readdir().
737 This directory provides access to Arvados collections as subdirectories listed
738 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
739 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
740 (in the form 'zzzzz-j7d0g-1234567890abcde').
742 Note that this directory will appear empty until you attempt to access a
743 specific collection or project subdirectory (such as trying to 'cd' into it),
744 at which point the collection or project will actually be looked up on the server
745 and the directory will appear if it exists.
749 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, pdh_only=False, storage_classes=None):
750 super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
752 self.num_retries = num_retries
753 self.pdh_only = pdh_only
754 self.storage_classes = storage_classes
756 def __setattr__(self, name, value):
757 super(MagicDirectory, self).__setattr__(name, value)
758 # When we're assigned an inode, add a README.
759 if ((name == 'inode') and (self.inode is not None) and
760 (not self._entries)):
761 self._entries['README'] = self.inodes.add_entry(
762 StringFile(self.inode, self.README_TEXT, time.time()))
763 # If we're the root directory, add an identical by_id subdirectory.
764 if self.inode == llfuse.ROOT_INODE:
765 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
766 self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
769 def __contains__(self, k):
770 if k in self._entries:
773 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
779 if group_uuid_pattern.match(k):
780 project = self.api.groups().list(
781 filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
782 if project[u'items_available'] == 0:
784 e = self.inodes.add_entry(ProjectDirectory(
785 self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
786 project[u'items'][0], storage_classes=self.storage_classes))
788 e = self.inodes.add_entry(CollectionDirectory(
789 self.inode, self.inodes, self.api, self.num_retries, self._enable_write, k))
792 if k not in self._entries:
795 self.inodes.del_entry(e)
798 self.inodes.invalidate_entry(self, k)
799 self.inodes.del_entry(e)
801 except Exception as ex:
802 _logger.exception("arv-mount lookup '%s':", k)
804 self.inodes.del_entry(e)
807 def __getitem__(self, item):
809 return self._entries[item]
811 raise KeyError("No collection with id " + item)
816 def want_event_subscribe(self):
817 return not self.pdh_only
820 class TagsDirectory(Directory):
821 """A special directory that contains as subdirectories all tags visible to the user."""
823 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, poll_time=60):
824 super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
826 self.num_retries = num_retries
828 self._poll_time = poll_time
831 def want_event_subscribe(self):
836 with llfuse.lock_released:
837 tags = self.api.links().list(
838 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
839 select=['name'], distinct=True, limit=1000
840 ).execute(num_retries=self.num_retries)
842 self.merge(tags['items']+[{"name": n} for n in self._extra],
844 lambda a, i: a.tag == i['name'],
845 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
846 i['name'], poll=self._poll, poll_time=self._poll_time))
850 def __getitem__(self, item):
851 if super(TagsDirectory, self).__contains__(item):
852 return super(TagsDirectory, self).__getitem__(item)
853 with llfuse.lock_released:
854 tags = self.api.links().list(
855 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
856 ).execute(num_retries=self.num_retries)
858 self._extra.add(item)
860 return super(TagsDirectory, self).__getitem__(item)
864 def __contains__(self, k):
865 if super(TagsDirectory, self).__contains__(k):
875 class TagDirectory(Directory):
876 """A special directory that contains as subdirectories all collections visible
877 to the user that are tagged with a particular tag.
880 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, tag,
881 poll=False, poll_time=60):
882 super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
884 self.num_retries = num_retries
887 self._poll_time = poll_time
889 def want_event_subscribe(self):
894 with llfuse.lock_released:
895 taggedcollections = self.api.links().list(
896 filters=[['link_class', '=', 'tag'],
897 ['name', '=', self.tag],
898 ['head_uuid', 'is_a', 'arvados#collection']],
900 ).execute(num_retries=self.num_retries)
901 self.merge(taggedcollections['items'],
902 lambda i: i['head_uuid'],
903 lambda a, i: a.collection_locator == i['head_uuid'],
904 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid']))
907 class ProjectDirectory(Directory):
908 """A special directory that contains the contents of a project."""
910 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, project_object,
911 poll=True, poll_time=3, storage_classes=None):
912 super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
914 self.num_retries = num_retries
915 self.project_object = project_object
916 self.project_object_file = None
917 self.project_uuid = project_object['uuid']
919 self._poll_time = poll_time
920 self._updating_lock = threading.Lock()
921 self._current_user = None
922 self._full_listing = False
923 self.storage_classes = storage_classes
925 def want_event_subscribe(self):
928 def createDirectory(self, i):
929 if collection_uuid_pattern.match(i['uuid']):
930 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i)
931 elif group_uuid_pattern.match(i['uuid']):
932 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
933 i, self._poll, self._poll_time, self.storage_classes)
934 elif link_uuid_pattern.match(i['uuid']):
935 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
936 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid'])
939 elif uuid_pattern.match(i['uuid']):
940 return ObjectFile(self.parent_inode, i)
945 return self.project_uuid
948 self._full_listing = True
949 return super(ProjectDirectory, self).items()
953 if i['name'] is None or len(i['name']) == 0:
955 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
956 # collection or subproject
958 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
961 elif 'kind' in i and i['kind'].startswith('arvados#'):
963 return "{}.{}".format(i['name'], i['kind'][8:])
970 if self.project_object_file == None:
971 self.project_object_file = ObjectFile(self.inode, self.project_object)
972 self.inodes.add_entry(self.project_object_file)
974 if not self._full_listing:
978 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
979 return a.uuid() == i['uuid']
980 elif isinstance(a, ObjectFile):
981 return a.uuid() == i['uuid'] and not a.stale()
985 with llfuse.lock_released:
986 self._updating_lock.acquire()
990 if group_uuid_pattern.match(self.project_uuid):
991 self.project_object = self.api.groups().get(
992 uuid=self.project_uuid).execute(num_retries=self.num_retries)
993 elif user_uuid_pattern.match(self.project_uuid):
994 self.project_object = self.api.users().get(
995 uuid=self.project_uuid).execute(num_retries=self.num_retries)
996 # do this in 2 steps until #17424 is fixed
997 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
999 num_retries=self.num_retries,
1000 uuid=self.project_uuid,
1001 filters=[["uuid", "is_a", "arvados#group"],
1002 ["groups.group_class", "in", ["project","filter"]]]))
1003 contents.extend(filter(lambda i: i["current_version_uuid"] == i["uuid"],
1004 arvados.util.keyset_list_all(self.api.groups().contents,
1006 num_retries=self.num_retries,
1007 uuid=self.project_uuid,
1008 filters=[["uuid", "is_a", "arvados#collection"]])))
1011 # end with llfuse.lock_released, re-acquire lock
1013 self.merge(contents,
1016 self.createDirectory)
1019 self._updating_lock.release()
1021 def _add_entry(self, i, name):
1022 ent = self.createDirectory(i)
1023 self._entries[name] = self.inodes.add_entry(ent)
1024 return self._entries[name]
1028 def __getitem__(self, k):
1029 if k == '.arvados#project':
1030 return self.project_object_file
1031 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1032 return super(ProjectDirectory, self).__getitem__(k)
1033 with llfuse.lock_released:
1034 k2 = self.unsanitize_filename(k)
1036 namefilter = ["name", "=", k]
1038 namefilter = ["name", "in", [k, k2]]
1039 contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
1040 ["group_class", "in", ["project","filter"]],
1042 limit=2).execute(num_retries=self.num_retries)["items"]
1044 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
1046 limit=2).execute(num_retries=self.num_retries)["items"]
1048 if len(contents) > 1 and contents[1]['name'] == k:
1049 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1051 contents = [contents[1]]
1052 name = self.sanitize_filename(self.namefn(contents[0]))
1055 return self._add_entry(contents[0], name)
1060 def __contains__(self, k):
1061 if k == '.arvados#project':
1073 if not self._enable_write:
1075 with llfuse.lock_released:
1076 if not self._current_user:
1077 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1078 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1080 def persisted(self):
1085 def mkdir(self, name):
1086 if not self.writable():
1087 raise llfuse.FUSEError(errno.EROFS)
1090 with llfuse.lock_released:
1092 "owner_uuid": self.project_uuid,
1094 "manifest_text": "" }
1095 if self.storage_classes is not None:
1096 c["storage_classes_desired"] = self.storage_classes
1098 self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1099 except Exception as e:
1102 except apiclient_errors.Error as error:
1103 _logger.error(error)
1104 raise llfuse.FUSEError(errno.EEXIST)
1108 def rmdir(self, name):
1109 if not self.writable():
1110 raise llfuse.FUSEError(errno.EROFS)
1112 if name not in self:
1113 raise llfuse.FUSEError(errno.ENOENT)
1114 if not isinstance(self[name], CollectionDirectory):
1115 raise llfuse.FUSEError(errno.EPERM)
1116 if len(self[name]) > 0:
1117 raise llfuse.FUSEError(errno.ENOTEMPTY)
1118 with llfuse.lock_released:
1119 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1124 def rename(self, name_old, name_new, src):
1125 if not self.writable():
1126 raise llfuse.FUSEError(errno.EROFS)
1128 if not isinstance(src, ProjectDirectory):
1129 raise llfuse.FUSEError(errno.EPERM)
1133 if not isinstance(ent, CollectionDirectory):
1134 raise llfuse.FUSEError(errno.EPERM)
1136 if name_new in self:
1137 # POSIX semantics for replacing one directory with another is
1138 # tricky (the target directory must be empty, the operation must be
1139 # atomic which isn't possible with the Arvados API as of this
1140 # writing) so don't support that.
1141 raise llfuse.FUSEError(errno.EPERM)
1143 self.api.collections().update(uuid=ent.uuid(),
1144 body={"owner_uuid": self.uuid(),
1145 "name": name_new}).execute(num_retries=self.num_retries)
1147 # Acually move the entry from source directory to this directory.
1148 del src._entries[name_old]
1149 self._entries[name_new] = ent
1150 self.inodes.invalidate_entry(src, name_old)
1153 def child_event(self, ev):
1154 properties = ev.get("properties") or {}
1155 old_attrs = properties.get("old_attributes") or {}
1156 new_attrs = properties.get("new_attributes") or {}
1157 old_attrs["uuid"] = ev["object_uuid"]
1158 new_attrs["uuid"] = ev["object_uuid"]
1159 old_name = self.sanitize_filename(self.namefn(old_attrs))
1160 new_name = self.sanitize_filename(self.namefn(new_attrs))
1162 # create events will have a new name, but not an old name
1163 # delete events will have an old name, but not a new name
1164 # update events will have an old and new name, and they may be same or different
1165 # if they are the same, an unrelated field changed and there is nothing to do.
1167 if old_attrs.get("owner_uuid") != self.project_uuid:
1168 # Was moved from somewhere else, so don't try to remove entry.
1170 if ev.get("object_owner_uuid") != self.project_uuid:
1171 # Was moved to somewhere else, so don't try to add entry
1174 if old_attrs.get("is_trashed"):
1175 # Was previously deleted
1177 if new_attrs.get("is_trashed"):
1181 if new_name != old_name:
1183 if old_name in self._entries:
1184 ent = self._entries[old_name]
1185 del self._entries[old_name]
1186 self.inodes.invalidate_entry(self, old_name)
1190 self._entries[new_name] = ent
1192 self._add_entry(new_attrs, new_name)
1193 elif ent is not None:
1194 self.inodes.del_entry(ent)
1197 class SharedDirectory(Directory):
1198 """A special directory that represents users or groups who have shared projects with me."""
1200 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, exclude,
1201 poll=False, poll_time=60, storage_classes=None):
1202 super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
1204 self.num_retries = num_retries
1205 self.current_user = api.users().current().execute(num_retries=num_retries)
1207 self._poll_time = poll_time
1208 self._updating_lock = threading.Lock()
1209 self.storage_classes = storage_classes
1214 with llfuse.lock_released:
1215 self._updating_lock.acquire()
1216 if not self.stale():
1224 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1225 if 'httpMethod' in methods.get('shared', {}):
1228 resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1232 include="owner_uuid").execute()
1233 if not resp["items"]:
1235 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1236 for r in resp["items"]:
1237 objects[r["uuid"]] = r
1238 roots.append(r["uuid"])
1239 for r in resp["included"]:
1240 objects[r["uuid"]] = r
1241 root_owners.add(r["uuid"])
1243 all_projects = list(arvados.util.keyset_list_all(
1244 self.api.groups().list,
1246 num_retries=self.num_retries,
1247 filters=[['group_class','in',['project','filter']]],
1248 select=["uuid", "owner_uuid"]))
1249 for ob in all_projects:
1250 objects[ob['uuid']] = ob
1252 current_uuid = self.current_user['uuid']
1253 for ob in all_projects:
1254 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1255 roots.append(ob['uuid'])
1256 root_owners.add(ob['owner_uuid'])
1258 lusers = arvados.util.keyset_list_all(
1259 self.api.users().list,
1261 num_retries=self.num_retries,
1262 filters=[['uuid','in', list(root_owners)]])
1263 lgroups = arvados.util.keyset_list_all(
1264 self.api.groups().list,
1266 num_retries=self.num_retries,
1267 filters=[['uuid','in', list(root_owners)+roots]])
1270 objects[l["uuid"]] = l
1272 objects[l["uuid"]] = l
1274 for r in root_owners:
1278 contents[obr["name"]] = obr
1279 elif "first_name" in obr:
1280 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1285 if obr['owner_uuid'] not in objects:
1286 contents[obr["name"]] = obr
1288 # end with llfuse.lock_released, re-acquire lock
1290 self.merge(contents.items(),
1292 lambda a, i: a.uuid() == i[1]['uuid'],
1293 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
1294 i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1296 _logger.exception("arv-mount shared dir error")
1298 self._updating_lock.release()
1300 def want_event_subscribe(self):