1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
15 from apiclient import errors as apiclient_errors
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
23 _logger = logging.getLogger('arvados.arvados_fuse')
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile('[\x00/]')
32 class Directory(FreshBase):
33 """Generic directory object, backed by a dict.
35 Consists of a set of entries with the key representing the filename
36 and the value referencing a File or Directory object.
39 def __init__(self, parent_inode, inodes, apiconfig, enable_write):
40 """parent_inode is the integer inode number"""
42 super(Directory, self).__init__()
45 if not isinstance(parent_inode, int):
46 raise Exception("parent_inode should be an int")
47 self.parent_inode = parent_inode
49 self.apiconfig = apiconfig
51 self._mtime = time.time()
52 self._enable_write = enable_write
54 def forward_slash_subst(self):
55 if not hasattr(self, '_fsns'):
57 config = self.apiconfig()
59 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
61 # old API server with no FSNS config
64 if self._fsns == '' or self._fsns == '/':
68 def unsanitize_filename(self, incoming):
69 """Replace ForwardSlashNameSubstitution value with /"""
70 fsns = self.forward_slash_subst()
71 if isinstance(fsns, str):
72 return incoming.replace(fsns, '/')
76 def sanitize_filename(self, dirty):
77 """Replace disallowed filename characters according to
78 ForwardSlashNameSubstitution in self.api_config."""
79 # '.' and '..' are not reachable if API server is newer than #6277
89 fsns = self.forward_slash_subst()
90 if isinstance(fsns, str):
91 dirty = dirty.replace('/', fsns)
92 return _disallowed_filename_characters.sub('_', dirty)
95 # Overridden by subclasses to implement logic to update the
96 # entries dict when the directory is stale
101 # Only used when computing the size of the disk footprint of the directory
109 def checkupdate(self):
113 except apiclient.errors.HttpError as e:
118 def __getitem__(self, item):
119 return self._entries[item]
124 return list(self._entries.items())
128 def __contains__(self, k):
129 return k in self._entries
134 return len(self._entries)
137 self.inodes.touch(self)
138 super(Directory, self).fresh()
140 def merge(self, items, fn, same, new_entry):
141 """Helper method for updating the contents of the directory.
143 Takes a list describing the new contents of the directory, reuse
144 entries that are the same in both the old and new lists, create new
145 entries, and delete old entries missing from the new list.
147 :items: iterable with new directory contents
149 :fn: function to take an entry in 'items' and return the desired file or
150 directory name, or None if this entry should be skipped
152 :same: function to compare an existing entry (a File or Directory
153 object) with an entry in the items list to determine whether to keep
156 :new_entry: function to create a new directory entry (File or Directory
157 object) from an entry in the items list.
161 oldentries = self._entries
165 name = self.sanitize_filename(fn(i))
167 if name in oldentries and same(oldentries[name], i):
168 # move existing directory entry over
169 self._entries[name] = oldentries[name]
172 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
173 # create new directory entry
176 self._entries[name] = self.inodes.add_entry(ent)
179 # delete any other directory entries that were not in found in 'items'
181 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
182 self.inodes.invalidate_entry(self, i)
183 self.inodes.del_entry(oldentries[i])
187 self.inodes.invalidate_inode(self)
188 self._mtime = time.time()
193 if super(Directory, self).in_use():
195 for v in self._entries.values():
200 def has_ref(self, only_children):
201 if super(Directory, self).has_ref(only_children):
203 for v in self._entries.values():
209 """Delete all entries"""
210 oldentries = self._entries
213 oldentries[n].clear()
214 self.inodes.del_entry(oldentries[n])
217 def kernel_invalidate(self):
218 # Invalidating the dentry on the parent implies invalidating all paths
220 parent = self.inodes[self.parent_inode]
222 # Find self on the parent in order to invalidate this path.
223 # Calling the public items() method might trigger a refresh,
224 # which we definitely don't want, so read the internal dict directly.
225 for k,v in parent._entries.items():
227 self.inodes.invalidate_entry(parent, k)
239 def want_event_subscribe(self):
240 raise NotImplementedError()
242 def create(self, name):
243 raise NotImplementedError()
245 def mkdir(self, name):
246 raise NotImplementedError()
248 def unlink(self, name):
249 raise NotImplementedError()
251 def rmdir(self, name):
252 raise NotImplementedError()
254 def rename(self, name_old, name_new, src):
255 raise NotImplementedError()
258 class CollectionDirectoryBase(Directory):
259 """Represent an Arvados Collection as a directory.
261 This class is used for Subcollections, and is also the base class for
262 CollectionDirectory, which implements collection loading/saving on
265 Most operations act only the underlying Arvados `Collection` object. The
266 `Collection` object signals via a notify callback to
267 `CollectionDirectoryBase.on_event` that an item was added, removed or
268 modified. FUSE inodes and directory entries are created, deleted or
269 invalidated in response to these events.
273 def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection, collection_root):
274 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write)
275 self.apiconfig = apiconfig
276 self.collection = collection
277 self.collection_root = collection_root
278 self.collection_record_file = None
280 def new_entry(self, name, item, mtime):
281 name = self.sanitize_filename(name)
282 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
283 if item.fuse_entry.dead is not True:
284 raise Exception("Can only reparent dead inode entry")
285 if item.fuse_entry.inode is None:
286 raise Exception("Reparented entry must still have valid inode")
287 item.fuse_entry.dead = False
288 self._entries[name] = item.fuse_entry
289 elif isinstance(item, arvados.collection.RichCollectionBase):
290 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item, self.collection_root))
291 self._entries[name].populate(mtime)
293 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
294 item.fuse_entry = self._entries[name]
296 def on_event(self, event, collection, name, item):
297 # These are events from the Collection object (ADD/DEL/MOD)
298 # emitted by operations on the Collection object (like
299 # "mkdirs" or "remove"), and by "update", which we need to
300 # synchronize with our FUSE objects that are assigned inodes.
301 if collection == self.collection:
302 name = self.sanitize_filename(name)
305 # It's possible for another thread to have llfuse.lock and
306 # be waiting on collection.lock. Meanwhile, we released
307 # llfuse.lock earlier in the stack, but are still holding
308 # on to the collection lock, and now we need to re-acquire
309 # llfuse.lock. If we don't release the collection lock,
310 # we'll deadlock where we're holding the collection lock
311 # waiting for llfuse.lock and the other thread is holding
312 # llfuse.lock and waiting for the collection lock.
314 # The correct locking order here is to take llfuse.lock
315 # first, then the collection lock.
317 # Since collection.lock is an RLock, it might be locked
318 # multiple times, so we need to release it multiple times,
319 # keep a count, then re-lock it the correct number of
325 self.collection.lock.release()
332 with self.collection.lock:
333 if event == arvados.collection.ADD:
334 self.new_entry(name, item, self.mtime())
335 elif event == arvados.collection.DEL:
336 ent = self._entries[name]
337 del self._entries[name]
338 self.inodes.invalidate_entry(self, name)
339 self.inodes.del_entry(ent)
340 elif event == arvados.collection.MOD:
341 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
342 self.inodes.invalidate_inode(item.fuse_entry)
343 elif name in self._entries:
344 self.inodes.invalidate_inode(self._entries[name])
346 if self.collection_record_file is not None:
347 self.collection_record_file.invalidate()
348 self.inodes.invalidate_inode(self.collection_record_file)
351 self.collection.lock.acquire()
354 def populate(self, mtime):
356 with self.collection.lock:
357 self.collection.subscribe(self.on_event)
358 for entry, item in self.collection.items():
359 self.new_entry(entry, item, self.mtime())
362 return self._enable_write and self.collection.writable()
366 self.collection_root.flush()
370 def create(self, name):
371 if not self.writable():
372 raise llfuse.FUSEError(errno.EROFS)
373 with llfuse.lock_released:
374 self.collection.open(name, "w").close()
378 def mkdir(self, name):
379 if not self.writable():
380 raise llfuse.FUSEError(errno.EROFS)
381 with llfuse.lock_released:
382 self.collection.mkdirs(name)
386 def unlink(self, name):
387 if not self.writable():
388 raise llfuse.FUSEError(errno.EROFS)
389 with llfuse.lock_released:
390 self.collection.remove(name)
395 def rmdir(self, name):
396 if not self.writable():
397 raise llfuse.FUSEError(errno.EROFS)
398 with llfuse.lock_released:
399 self.collection.remove(name)
404 def rename(self, name_old, name_new, src):
405 if not self.writable():
406 raise llfuse.FUSEError(errno.EROFS)
408 if not isinstance(src, CollectionDirectoryBase):
409 raise llfuse.FUSEError(errno.EPERM)
414 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
416 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
418 raise llfuse.FUSEError(errno.ENOTEMPTY)
419 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
420 raise llfuse.FUSEError(errno.ENOTDIR)
421 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
422 raise llfuse.FUSEError(errno.EISDIR)
424 with llfuse.lock_released:
425 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
430 super(CollectionDirectoryBase, self).clear()
431 self.collection = None
434 class CollectionDirectory(CollectionDirectoryBase):
435 """Represents the root of a directory tree representing a collection."""
437 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, collection_record=None, explicit_collection=None):
438 super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None, self)
440 self.num_retries = num_retries
443 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
445 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
446 self._poll_time = 60*60
448 if isinstance(collection_record, dict):
449 self.collection_locator = collection_record['uuid']
450 self._mtime = convertTime(collection_record.get('modified_at'))
452 self.collection_locator = collection_record
454 self._manifest_size = 0
455 if self.collection_locator:
456 self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
457 self._updating_lock = threading.Lock()
460 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
463 return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
467 if not self.writable():
469 with llfuse.lock_released:
470 with self._updating_lock:
471 if self.collection.committed():
472 self.collection.update()
474 self.collection.save()
475 self.new_collection_record(self.collection.api_response())
477 def want_event_subscribe(self):
478 return (uuid_pattern.match(self.collection_locator) is not None)
480 def new_collection(self, new_collection_record, coll_reader):
483 self.collection = coll_reader
484 self.new_collection_record(new_collection_record)
485 self.populate(self.mtime())
487 def new_collection_record(self, new_collection_record):
488 if not new_collection_record:
489 raise Exception("invalid new_collection_record")
490 self._mtime = convertTime(new_collection_record.get('modified_at'))
491 self._manifest_size = len(new_collection_record["manifest_text"])
492 self.collection_locator = new_collection_record["uuid"]
493 if self.collection_record_file is not None:
494 self.collection_record_file.invalidate()
495 self.inodes.invalidate_inode(self.collection_record_file)
496 _logger.debug("%s invalidated collection record file", self)
500 return self.collection_locator
505 if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
506 # It's immutable, nothing to update
509 if self.collection_locator is None:
510 # No collection locator to retrieve from
514 new_collection_record = None
516 with llfuse.lock_released:
517 self._updating_lock.acquire()
521 _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
523 if self.collection is not None:
524 # Already have a collection object
525 self.collection.update()
526 new_collection_record = self.collection.api_response()
528 # Create a new collection object
529 if uuid_pattern.match(self.collection_locator):
530 coll_reader = arvados.collection.Collection(
531 self.collection_locator, self.api, self.api.keep,
532 num_retries=self.num_retries,
533 get_threads=(self.api.keep.block_cache.cache_max // 64 * 1024 * 1024) )
535 coll_reader = arvados.collection.CollectionReader(
536 self.collection_locator, self.api, self.api.keep,
537 num_retries=self.num_retries,
538 get_threads=(self.api.keep.block_cache.cache_max // 64 * 1024 * 1024)
540 new_collection_record = coll_reader.api_response() or {}
541 # If the Collection only exists in Keep, there will be no API
542 # response. Fill in the fields we need.
543 if 'uuid' not in new_collection_record:
544 new_collection_record['uuid'] = self.collection_locator
545 if "portable_data_hash" not in new_collection_record:
546 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
547 if 'manifest_text' not in new_collection_record:
548 new_collection_record['manifest_text'] = coll_reader.manifest_text()
549 if 'storage_classes_desired' not in new_collection_record:
550 new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
552 # end with llfuse.lock_released, re-acquire lock
554 if new_collection_record is not None:
555 if coll_reader is not None:
556 self.new_collection(new_collection_record, coll_reader)
558 self.new_collection_record(new_collection_record)
562 self._updating_lock.release()
563 except arvados.errors.NotFoundError as e:
564 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
565 except arvados.errors.ArgumentError as detail:
566 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
567 if new_collection_record is not None and "manifest_text" in new_collection_record:
568 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
570 _logger.exception("arv-mount %s: error", self.collection_locator)
571 if new_collection_record is not None and "manifest_text" in new_collection_record:
572 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
577 def collection_record(self):
579 return self.collection.api_response()
583 def __getitem__(self, item):
584 if item == '.arvados#collection':
585 if self.collection_record_file is None:
586 self.collection_record_file = FuncToJSONFile(
587 self.inode, self.collection_record)
588 self.inodes.add_entry(self.collection_record_file)
589 self.invalidate() # use lookup as a signal to force update
590 return self.collection_record_file
592 return super(CollectionDirectory, self).__getitem__(item)
594 def __contains__(self, k):
595 if k == '.arvados#collection':
598 return super(CollectionDirectory, self).__contains__(k)
600 def invalidate(self):
601 if self.collection_record_file is not None:
602 self.collection_record_file.invalidate()
603 self.inodes.invalidate_inode(self.collection_record_file)
604 super(CollectionDirectory, self).invalidate()
607 return (self.collection_locator is not None)
610 # This is an empirically-derived heuristic to estimate the memory used
611 # to store this collection's metadata. Calculating the memory
612 # footprint directly would be more accurate, but also more complicated.
613 return self._manifest_size * 128
616 if self.collection is not None:
618 self.collection.save()
619 self.collection.stop_threads()
622 if self.collection is not None:
623 self.collection.stop_threads()
624 super(CollectionDirectory, self).clear()
625 self._manifest_size = 0
628 class TmpCollectionDirectory(CollectionDirectoryBase):
629 """A directory backed by an Arvados collection that never gets saved.
631 This supports using Keep as scratch space. A userspace program can
632 read the .arvados#collection file to get a current manifest in
633 order to save a snapshot of the scratch data or use it as a crunch
637 class UnsaveableCollection(arvados.collection.Collection):
643 def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, storage_classes=None):
644 collection = self.UnsaveableCollection(
645 api_client=api_client,
646 keep_client=api_client.keep,
647 num_retries=num_retries,
648 storage_classes_desired=storage_classes)
649 # This is always enable_write=True because it never tries to
650 # save to the backend
651 super(TmpCollectionDirectory, self).__init__(
652 parent_inode, inodes, api_client.config, True, collection, self)
653 self.populate(self.mtime())
655 def on_event(self, *args, **kwargs):
656 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
657 if self.collection_record_file is None:
660 # See discussion in CollectionDirectoryBase.on_event
664 self.collection.lock.release()
671 with self.collection.lock:
672 self.collection_record_file.invalidate()
673 self.inodes.invalidate_inode(self.collection_record_file)
674 _logger.debug("%s invalidated collection record", self)
677 self.collection.lock.acquire()
680 def collection_record(self):
681 with llfuse.lock_released:
684 "manifest_text": self.collection.manifest_text(),
685 "portable_data_hash": self.collection.portable_data_hash(),
686 "storage_classes_desired": self.collection.storage_classes_desired(),
689 def __contains__(self, k):
690 return (k == '.arvados#collection' or
691 super(TmpCollectionDirectory, self).__contains__(k))
694 def __getitem__(self, item):
695 if item == '.arvados#collection':
696 if self.collection_record_file is None:
697 self.collection_record_file = FuncToJSONFile(
698 self.inode, self.collection_record)
699 self.inodes.add_entry(self.collection_record_file)
700 return self.collection_record_file
701 return super(TmpCollectionDirectory, self).__getitem__(item)
712 def want_event_subscribe(self):
716 self.collection.stop_threads()
718 def invalidate(self):
719 if self.collection_record_file:
720 self.collection_record_file.invalidate()
721 super(TmpCollectionDirectory, self).invalidate()
724 class MagicDirectory(Directory):
725 """A special directory that logically contains the set of all extant keep locators.
727 When a file is referenced by lookup(), it is tested to see if it is a valid
728 keep locator to a manifest, and if so, loads the manifest contents as a
729 subdirectory of this directory with the locator as the directory name.
730 Since querying a list of all extant keep locators is impractical, only
731 collections that have already been accessed are visible to readdir().
736 This directory provides access to Arvados collections as subdirectories listed
737 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
738 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
739 (in the form 'zzzzz-j7d0g-1234567890abcde').
741 Note that this directory will appear empty until you attempt to access a
742 specific collection or project subdirectory (such as trying to 'cd' into it),
743 at which point the collection or project will actually be looked up on the server
744 and the directory will appear if it exists.
748 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, pdh_only=False, storage_classes=None):
749 super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
751 self.num_retries = num_retries
752 self.pdh_only = pdh_only
753 self.storage_classes = storage_classes
755 def __setattr__(self, name, value):
756 super(MagicDirectory, self).__setattr__(name, value)
757 # When we're assigned an inode, add a README.
758 if ((name == 'inode') and (self.inode is not None) and
759 (not self._entries)):
760 self._entries['README'] = self.inodes.add_entry(
761 StringFile(self.inode, self.README_TEXT, time.time()))
762 # If we're the root directory, add an identical by_id subdirectory.
763 if self.inode == llfuse.ROOT_INODE:
764 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
765 self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
768 def __contains__(self, k):
769 if k in self._entries:
772 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
778 if group_uuid_pattern.match(k):
779 project = self.api.groups().list(
780 filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
781 if project[u'items_available'] == 0:
783 e = self.inodes.add_entry(ProjectDirectory(
784 self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
785 project[u'items'][0], storage_classes=self.storage_classes))
787 e = self.inodes.add_entry(CollectionDirectory(
788 self.inode, self.inodes, self.api, self.num_retries, self._enable_write, k))
791 if k not in self._entries:
794 self.inodes.del_entry(e)
797 self.inodes.invalidate_entry(self, k)
798 self.inodes.del_entry(e)
800 except Exception as ex:
801 _logger.exception("arv-mount lookup '%s':", k)
803 self.inodes.del_entry(e)
806 def __getitem__(self, item):
808 return self._entries[item]
810 raise KeyError("No collection with id " + item)
815 def want_event_subscribe(self):
816 return not self.pdh_only
819 class TagsDirectory(Directory):
820 """A special directory that contains as subdirectories all tags visible to the user."""
822 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, poll_time=60):
823 super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
825 self.num_retries = num_retries
827 self._poll_time = poll_time
830 def want_event_subscribe(self):
835 with llfuse.lock_released:
836 tags = self.api.links().list(
837 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
838 select=['name'], distinct=True, limit=1000
839 ).execute(num_retries=self.num_retries)
841 self.merge(tags['items']+[{"name": n} for n in self._extra],
843 lambda a, i: a.tag == i['name'],
844 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
845 i['name'], poll=self._poll, poll_time=self._poll_time))
849 def __getitem__(self, item):
850 if super(TagsDirectory, self).__contains__(item):
851 return super(TagsDirectory, self).__getitem__(item)
852 with llfuse.lock_released:
853 tags = self.api.links().list(
854 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
855 ).execute(num_retries=self.num_retries)
857 self._extra.add(item)
859 return super(TagsDirectory, self).__getitem__(item)
863 def __contains__(self, k):
864 if super(TagsDirectory, self).__contains__(k):
874 class TagDirectory(Directory):
875 """A special directory that contains as subdirectories all collections visible
876 to the user that are tagged with a particular tag.
879 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, tag,
880 poll=False, poll_time=60):
881 super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
883 self.num_retries = num_retries
886 self._poll_time = poll_time
888 def want_event_subscribe(self):
893 with llfuse.lock_released:
894 taggedcollections = self.api.links().list(
895 filters=[['link_class', '=', 'tag'],
896 ['name', '=', self.tag],
897 ['head_uuid', 'is_a', 'arvados#collection']],
899 ).execute(num_retries=self.num_retries)
900 self.merge(taggedcollections['items'],
901 lambda i: i['head_uuid'],
902 lambda a, i: a.collection_locator == i['head_uuid'],
903 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid']))
906 class ProjectDirectory(Directory):
907 """A special directory that contains the contents of a project."""
909 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, project_object,
910 poll=True, poll_time=3, storage_classes=None):
911 super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
913 self.num_retries = num_retries
914 self.project_object = project_object
915 self.project_object_file = None
916 self.project_uuid = project_object['uuid']
918 self._poll_time = poll_time
919 self._updating_lock = threading.Lock()
920 self._current_user = None
921 self._full_listing = False
922 self.storage_classes = storage_classes
924 def want_event_subscribe(self):
927 def createDirectory(self, i):
928 if collection_uuid_pattern.match(i['uuid']):
929 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i)
930 elif group_uuid_pattern.match(i['uuid']):
931 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
932 i, self._poll, self._poll_time, self.storage_classes)
933 elif link_uuid_pattern.match(i['uuid']):
934 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
935 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid'])
938 elif uuid_pattern.match(i['uuid']):
939 return ObjectFile(self.parent_inode, i)
944 return self.project_uuid
947 self._full_listing = True
948 return super(ProjectDirectory, self).items()
952 if i['name'] is None or len(i['name']) == 0:
954 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
955 # collection or subproject
957 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
960 elif 'kind' in i and i['kind'].startswith('arvados#'):
962 return "{}.{}".format(i['name'], i['kind'][8:])
969 if self.project_object_file == None:
970 self.project_object_file = ObjectFile(self.inode, self.project_object)
971 self.inodes.add_entry(self.project_object_file)
973 if not self._full_listing:
977 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
978 return a.uuid() == i['uuid']
979 elif isinstance(a, ObjectFile):
980 return a.uuid() == i['uuid'] and not a.stale()
984 with llfuse.lock_released:
985 self._updating_lock.acquire()
989 if group_uuid_pattern.match(self.project_uuid):
990 self.project_object = self.api.groups().get(
991 uuid=self.project_uuid).execute(num_retries=self.num_retries)
992 elif user_uuid_pattern.match(self.project_uuid):
993 self.project_object = self.api.users().get(
994 uuid=self.project_uuid).execute(num_retries=self.num_retries)
995 # do this in 2 steps until #17424 is fixed
996 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
998 num_retries=self.num_retries,
999 uuid=self.project_uuid,
1000 filters=[["uuid", "is_a", "arvados#group"],
1001 ["groups.group_class", "in", ["project","filter"]]]))
1002 contents.extend(filter(lambda i: i["current_version_uuid"] == i["uuid"],
1003 arvados.util.keyset_list_all(self.api.groups().contents,
1005 num_retries=self.num_retries,
1006 uuid=self.project_uuid,
1007 filters=[["uuid", "is_a", "arvados#collection"]])))
1010 # end with llfuse.lock_released, re-acquire lock
1012 self.merge(contents,
1015 self.createDirectory)
1018 self._updating_lock.release()
1020 def _add_entry(self, i, name):
1021 ent = self.createDirectory(i)
1022 self._entries[name] = self.inodes.add_entry(ent)
1023 return self._entries[name]
1027 def __getitem__(self, k):
1028 if k == '.arvados#project':
1029 return self.project_object_file
1030 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1031 return super(ProjectDirectory, self).__getitem__(k)
1032 with llfuse.lock_released:
1033 k2 = self.unsanitize_filename(k)
1035 namefilter = ["name", "=", k]
1037 namefilter = ["name", "in", [k, k2]]
1038 contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
1039 ["group_class", "in", ["project","filter"]],
1041 limit=2).execute(num_retries=self.num_retries)["items"]
1043 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
1045 limit=2).execute(num_retries=self.num_retries)["items"]
1047 if len(contents) > 1 and contents[1]['name'] == k:
1048 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1050 contents = [contents[1]]
1051 name = self.sanitize_filename(self.namefn(contents[0]))
1054 return self._add_entry(contents[0], name)
1059 def __contains__(self, k):
1060 if k == '.arvados#project':
1072 if not self._enable_write:
1074 with llfuse.lock_released:
1075 if not self._current_user:
1076 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1077 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1079 def persisted(self):
1084 def mkdir(self, name):
1085 if not self.writable():
1086 raise llfuse.FUSEError(errno.EROFS)
1089 with llfuse.lock_released:
1091 "owner_uuid": self.project_uuid,
1093 "manifest_text": "" }
1094 if self.storage_classes is not None:
1095 c["storage_classes_desired"] = self.storage_classes
1097 self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1098 except Exception as e:
1101 except apiclient_errors.Error as error:
1102 _logger.error(error)
1103 raise llfuse.FUSEError(errno.EEXIST)
1107 def rmdir(self, name):
1108 if not self.writable():
1109 raise llfuse.FUSEError(errno.EROFS)
1111 if name not in self:
1112 raise llfuse.FUSEError(errno.ENOENT)
1113 if not isinstance(self[name], CollectionDirectory):
1114 raise llfuse.FUSEError(errno.EPERM)
1115 if len(self[name]) > 0:
1116 raise llfuse.FUSEError(errno.ENOTEMPTY)
1117 with llfuse.lock_released:
1118 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1123 def rename(self, name_old, name_new, src):
1124 if not self.writable():
1125 raise llfuse.FUSEError(errno.EROFS)
1127 if not isinstance(src, ProjectDirectory):
1128 raise llfuse.FUSEError(errno.EPERM)
1132 if not isinstance(ent, CollectionDirectory):
1133 raise llfuse.FUSEError(errno.EPERM)
1135 if name_new in self:
1136 # POSIX semantics for replacing one directory with another is
1137 # tricky (the target directory must be empty, the operation must be
1138 # atomic which isn't possible with the Arvados API as of this
1139 # writing) so don't support that.
1140 raise llfuse.FUSEError(errno.EPERM)
1142 self.api.collections().update(uuid=ent.uuid(),
1143 body={"owner_uuid": self.uuid(),
1144 "name": name_new}).execute(num_retries=self.num_retries)
1146 # Acually move the entry from source directory to this directory.
1147 del src._entries[name_old]
1148 self._entries[name_new] = ent
1149 self.inodes.invalidate_entry(src, name_old)
1152 def child_event(self, ev):
1153 properties = ev.get("properties") or {}
1154 old_attrs = properties.get("old_attributes") or {}
1155 new_attrs = properties.get("new_attributes") or {}
1156 old_attrs["uuid"] = ev["object_uuid"]
1157 new_attrs["uuid"] = ev["object_uuid"]
1158 old_name = self.sanitize_filename(self.namefn(old_attrs))
1159 new_name = self.sanitize_filename(self.namefn(new_attrs))
1161 # create events will have a new name, but not an old name
1162 # delete events will have an old name, but not a new name
1163 # update events will have an old and new name, and they may be same or different
1164 # if they are the same, an unrelated field changed and there is nothing to do.
1166 if old_attrs.get("owner_uuid") != self.project_uuid:
1167 # Was moved from somewhere else, so don't try to remove entry.
1169 if ev.get("object_owner_uuid") != self.project_uuid:
1170 # Was moved to somewhere else, so don't try to add entry
1173 if old_attrs.get("is_trashed"):
1174 # Was previously deleted
1176 if new_attrs.get("is_trashed"):
1180 if new_name != old_name:
1182 if old_name in self._entries:
1183 ent = self._entries[old_name]
1184 del self._entries[old_name]
1185 self.inodes.invalidate_entry(self, old_name)
1189 self._entries[new_name] = ent
1191 self._add_entry(new_attrs, new_name)
1192 elif ent is not None:
1193 self.inodes.del_entry(ent)
1196 class SharedDirectory(Directory):
1197 """A special directory that represents users or groups who have shared projects with me."""
1199 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, exclude,
1200 poll=False, poll_time=60, storage_classes=None):
1201 super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
1203 self.num_retries = num_retries
1204 self.current_user = api.users().current().execute(num_retries=num_retries)
1206 self._poll_time = poll_time
1207 self._updating_lock = threading.Lock()
1208 self.storage_classes = storage_classes
1213 with llfuse.lock_released:
1214 self._updating_lock.acquire()
1215 if not self.stale():
1223 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1224 if 'httpMethod' in methods.get('shared', {}):
1227 resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1231 include="owner_uuid").execute()
1232 if not resp["items"]:
1234 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1235 for r in resp["items"]:
1236 objects[r["uuid"]] = r
1237 roots.append(r["uuid"])
1238 for r in resp["included"]:
1239 objects[r["uuid"]] = r
1240 root_owners.add(r["uuid"])
1242 all_projects = list(arvados.util.keyset_list_all(
1243 self.api.groups().list,
1245 num_retries=self.num_retries,
1246 filters=[['group_class','in',['project','filter']]],
1247 select=["uuid", "owner_uuid"]))
1248 for ob in all_projects:
1249 objects[ob['uuid']] = ob
1251 current_uuid = self.current_user['uuid']
1252 for ob in all_projects:
1253 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1254 roots.append(ob['uuid'])
1255 root_owners.add(ob['owner_uuid'])
1257 lusers = arvados.util.keyset_list_all(
1258 self.api.users().list,
1260 num_retries=self.num_retries,
1261 filters=[['uuid','in', list(root_owners)]])
1262 lgroups = arvados.util.keyset_list_all(
1263 self.api.groups().list,
1265 num_retries=self.num_retries,
1266 filters=[['uuid','in', list(root_owners)+roots]])
1269 objects[l["uuid"]] = l
1271 objects[l["uuid"]] = l
1273 for r in root_owners:
1277 contents[obr["name"]] = obr
1278 elif "first_name" in obr:
1279 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1284 if obr['owner_uuid'] not in objects:
1285 contents[obr["name"]] = obr
1287 # end with llfuse.lock_released, re-acquire lock
1289 self.merge(contents.items(),
1291 lambda a, i: a.uuid() == i[1]['uuid'],
1292 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
1293 i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1295 _logger.exception("arv-mount shared dir error")
1297 self._updating_lock.release()
1299 def want_event_subscribe(self):