1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
15 from apiclient import errors as apiclient_errors
17 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
18 from .fresh import FreshBase, convertTime, use_counter, check_update
20 import arvados.collection
21 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
23 _logger = logging.getLogger('arvados.arvados_fuse')
26 # Match any character which FUSE or Linux cannot accommodate as part
27 # of a filename. (If present in a collection filename, they will
28 # appear as underscores in the fuse mount.)
29 _disallowed_filename_characters = re.compile(r'[\x00/]')
32 class Directory(FreshBase):
33 """Generic directory object, backed by a dict.
35 Consists of a set of entries with the key representing the filename
36 and the value referencing a File or Directory object.
39 def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters):
40 """parent_inode is the integer inode number"""
42 super(Directory, self).__init__()
45 if not isinstance(parent_inode, int):
46 raise Exception("parent_inode should be an int")
47 self.parent_inode = parent_inode
49 self.apiconfig = apiconfig
51 self._mtime = time.time()
52 self._enable_write = enable_write
53 self._filters = filters or []
55 def _filters_for(self, subtype, *, qualified):
56 for f in self._filters:
57 f_type, _, f_name = f[0].partition('.')
60 elif f_type != subtype:
65 yield [f_name, *f[1:]]
67 def forward_slash_subst(self):
68 if not hasattr(self, '_fsns'):
70 config = self.apiconfig()
72 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
74 # old API server with no FSNS config
77 if self._fsns == '' or self._fsns == '/':
81 def unsanitize_filename(self, incoming):
82 """Replace ForwardSlashNameSubstitution value with /"""
83 fsns = self.forward_slash_subst()
84 if isinstance(fsns, str):
85 return incoming.replace(fsns, '/')
89 def sanitize_filename(self, dirty):
90 """Replace disallowed filename characters according to
91 ForwardSlashNameSubstitution in self.api_config."""
92 # '.' and '..' are not reachable if API server is newer than #6277
102 fsns = self.forward_slash_subst()
103 if isinstance(fsns, str):
104 dirty = dirty.replace('/', fsns)
105 return _disallowed_filename_characters.sub('_', dirty)
108 # Overridden by subclasses to implement logic to update the
109 # entries dict when the directory is stale
114 # Only used when computing the size of the disk footprint of the directory
122 def checkupdate(self):
126 except apiclient.errors.HttpError as e:
131 def __getitem__(self, item):
132 return self._entries[item]
137 return list(self._entries.items())
141 def __contains__(self, k):
142 return k in self._entries
147 return len(self._entries)
150 self.inodes.touch(self)
151 super(Directory, self).fresh()
153 def merge(self, items, fn, same, new_entry):
154 """Helper method for updating the contents of the directory.
156 Takes a list describing the new contents of the directory, reuse
157 entries that are the same in both the old and new lists, create new
158 entries, and delete old entries missing from the new list.
160 :items: iterable with new directory contents
162 :fn: function to take an entry in 'items' and return the desired file or
163 directory name, or None if this entry should be skipped
165 :same: function to compare an existing entry (a File or Directory
166 object) with an entry in the items list to determine whether to keep
169 :new_entry: function to create a new directory entry (File or Directory
170 object) from an entry in the items list.
174 oldentries = self._entries
178 name = self.sanitize_filename(fn(i))
180 if name in oldentries and same(oldentries[name], i):
181 # move existing directory entry over
182 self._entries[name] = oldentries[name]
185 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
186 # create new directory entry
189 self._entries[name] = self.inodes.add_entry(ent)
192 # delete any other directory entries that were not in found in 'items'
194 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
195 self.inodes.invalidate_entry(self, i)
196 self.inodes.del_entry(oldentries[i])
200 self.inodes.invalidate_inode(self)
201 self._mtime = time.time()
206 if super(Directory, self).in_use():
208 for v in self._entries.values():
213 def has_ref(self, only_children):
214 if super(Directory, self).has_ref(only_children):
216 for v in self._entries.values():
222 """Delete all entries"""
223 oldentries = self._entries
226 oldentries[n].clear()
227 self.inodes.del_entry(oldentries[n])
230 def kernel_invalidate(self):
231 # Invalidating the dentry on the parent implies invalidating all paths
233 parent = self.inodes[self.parent_inode]
235 # Find self on the parent in order to invalidate this path.
236 # Calling the public items() method might trigger a refresh,
237 # which we definitely don't want, so read the internal dict directly.
238 for k,v in parent._entries.items():
240 self.inodes.invalidate_entry(parent, k)
252 def want_event_subscribe(self):
253 raise NotImplementedError()
255 def create(self, name):
256 raise NotImplementedError()
258 def mkdir(self, name):
259 raise NotImplementedError()
261 def unlink(self, name):
262 raise NotImplementedError()
264 def rmdir(self, name):
265 raise NotImplementedError()
267 def rename(self, name_old, name_new, src):
268 raise NotImplementedError()
271 class CollectionDirectoryBase(Directory):
272 """Represent an Arvados Collection as a directory.
274 This class is used for Subcollections, and is also the base class for
275 CollectionDirectory, which implements collection loading/saving on
278 Most operations act only the underlying Arvados `Collection` object. The
279 `Collection` object signals via a notify callback to
280 `CollectionDirectoryBase.on_event` that an item was added, removed or
281 modified. FUSE inodes and directory entries are created, deleted or
282 invalidated in response to these events.
286 def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters, collection, collection_root):
287 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write, filters)
288 self.apiconfig = apiconfig
289 self.collection = collection
290 self.collection_root = collection_root
291 self.collection_record_file = None
293 def new_entry(self, name, item, mtime):
294 name = self.sanitize_filename(name)
295 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
296 if item.fuse_entry.dead is not True:
297 raise Exception("Can only reparent dead inode entry")
298 if item.fuse_entry.inode is None:
299 raise Exception("Reparented entry must still have valid inode")
300 item.fuse_entry.dead = False
301 self._entries[name] = item.fuse_entry
302 elif isinstance(item, arvados.collection.RichCollectionBase):
303 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
310 self.collection_root,
312 self._entries[name].populate(mtime)
314 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
315 item.fuse_entry = self._entries[name]
317 def on_event(self, event, collection, name, item):
318 # These are events from the Collection object (ADD/DEL/MOD)
319 # emitted by operations on the Collection object (like
320 # "mkdirs" or "remove"), and by "update", which we need to
321 # synchronize with our FUSE objects that are assigned inodes.
322 if collection == self.collection:
323 name = self.sanitize_filename(name)
326 # It's possible for another thread to have llfuse.lock and
327 # be waiting on collection.lock. Meanwhile, we released
328 # llfuse.lock earlier in the stack, but are still holding
329 # on to the collection lock, and now we need to re-acquire
330 # llfuse.lock. If we don't release the collection lock,
331 # we'll deadlock where we're holding the collection lock
332 # waiting for llfuse.lock and the other thread is holding
333 # llfuse.lock and waiting for the collection lock.
335 # The correct locking order here is to take llfuse.lock
336 # first, then the collection lock.
338 # Since collection.lock is an RLock, it might be locked
339 # multiple times, so we need to release it multiple times,
340 # keep a count, then re-lock it the correct number of
346 self.collection.lock.release()
353 with self.collection.lock:
354 if event == arvados.collection.ADD:
355 self.new_entry(name, item, self.mtime())
356 elif event == arvados.collection.DEL:
357 ent = self._entries[name]
358 del self._entries[name]
359 self.inodes.invalidate_entry(self, name)
360 self.inodes.del_entry(ent)
361 elif event == arvados.collection.MOD:
362 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
363 self.inodes.invalidate_inode(item.fuse_entry)
364 elif name in self._entries:
365 self.inodes.invalidate_inode(self._entries[name])
367 if self.collection_record_file is not None:
368 self.collection_record_file.invalidate()
369 self.inodes.invalidate_inode(self.collection_record_file)
372 self.collection.lock.acquire()
375 def populate(self, mtime):
377 with self.collection.lock:
378 self.collection.subscribe(self.on_event)
379 for entry, item in self.collection.items():
380 self.new_entry(entry, item, self.mtime())
383 return self._enable_write and self.collection.writable()
387 self.collection_root.flush()
391 def create(self, name):
392 if not self.writable():
393 raise llfuse.FUSEError(errno.EROFS)
394 with llfuse.lock_released:
395 self.collection.open(name, "w").close()
399 def mkdir(self, name):
400 if not self.writable():
401 raise llfuse.FUSEError(errno.EROFS)
402 with llfuse.lock_released:
403 self.collection.mkdirs(name)
407 def unlink(self, name):
408 if not self.writable():
409 raise llfuse.FUSEError(errno.EROFS)
410 with llfuse.lock_released:
411 self.collection.remove(name)
416 def rmdir(self, name):
417 if not self.writable():
418 raise llfuse.FUSEError(errno.EROFS)
419 with llfuse.lock_released:
420 self.collection.remove(name)
425 def rename(self, name_old, name_new, src):
426 if not self.writable():
427 raise llfuse.FUSEError(errno.EROFS)
429 if not isinstance(src, CollectionDirectoryBase):
430 raise llfuse.FUSEError(errno.EPERM)
435 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
437 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
439 raise llfuse.FUSEError(errno.ENOTEMPTY)
440 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
441 raise llfuse.FUSEError(errno.ENOTDIR)
442 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
443 raise llfuse.FUSEError(errno.EISDIR)
445 with llfuse.lock_released:
446 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
451 super(CollectionDirectoryBase, self).clear()
452 self.collection = None
455 class CollectionDirectory(CollectionDirectoryBase):
456 """Represents the root of a directory tree representing a collection."""
458 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
459 super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters, None, self)
461 self.num_retries = num_retries
464 self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
466 _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
467 self._poll_time = 60*60
469 if isinstance(collection_record, dict):
470 self.collection_locator = collection_record['uuid']
471 self._mtime = convertTime(collection_record.get('modified_at'))
473 self.collection_locator = collection_record
475 self._manifest_size = 0
476 if self.collection_locator:
477 self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
478 self._updating_lock = threading.Lock()
481 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
484 return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
488 if not self.writable():
490 with llfuse.lock_released:
491 with self._updating_lock:
492 if self.collection.committed():
493 self.collection.update()
495 self.collection.save()
496 self.new_collection_record(self.collection.api_response())
498 def want_event_subscribe(self):
499 return (uuid_pattern.match(self.collection_locator) is not None)
501 def new_collection(self, new_collection_record, coll_reader):
504 self.collection = coll_reader
505 self.new_collection_record(new_collection_record)
506 self.populate(self.mtime())
508 def new_collection_record(self, new_collection_record):
509 if not new_collection_record:
510 raise Exception("invalid new_collection_record")
511 self._mtime = convertTime(new_collection_record.get('modified_at'))
512 self._manifest_size = len(new_collection_record["manifest_text"])
513 self.collection_locator = new_collection_record["uuid"]
514 if self.collection_record_file is not None:
515 self.collection_record_file.invalidate()
516 self.inodes.invalidate_inode(self.collection_record_file)
517 _logger.debug("%s invalidated collection record file", self)
521 return self.collection_locator
526 if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
527 # It's immutable, nothing to update
530 if self.collection_locator is None:
531 # No collection locator to retrieve from
535 new_collection_record = None
537 with llfuse.lock_released:
538 self._updating_lock.acquire()
542 _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
544 if self.collection is not None:
545 # Already have a collection object
546 self.collection.update()
547 new_collection_record = self.collection.api_response()
549 # Create a new collection object
550 if uuid_pattern.match(self.collection_locator):
551 coll_reader = arvados.collection.Collection(
552 self.collection_locator, self.api, self.api.keep,
553 num_retries=self.num_retries)
555 coll_reader = arvados.collection.CollectionReader(
556 self.collection_locator, self.api, self.api.keep,
557 num_retries=self.num_retries)
558 new_collection_record = coll_reader.api_response() or {}
559 # If the Collection only exists in Keep, there will be no API
560 # response. Fill in the fields we need.
561 if 'uuid' not in new_collection_record:
562 new_collection_record['uuid'] = self.collection_locator
563 if "portable_data_hash" not in new_collection_record:
564 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
565 if 'manifest_text' not in new_collection_record:
566 new_collection_record['manifest_text'] = coll_reader.manifest_text()
567 if 'storage_classes_desired' not in new_collection_record:
568 new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
570 # end with llfuse.lock_released, re-acquire lock
572 if new_collection_record is not None:
573 if coll_reader is not None:
574 self.new_collection(new_collection_record, coll_reader)
576 self.new_collection_record(new_collection_record)
580 self._updating_lock.release()
581 except arvados.errors.NotFoundError as e:
582 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
583 except arvados.errors.ArgumentError as detail:
584 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
585 if new_collection_record is not None and "manifest_text" in new_collection_record:
586 _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
588 _logger.exception("arv-mount %s: error", self.collection_locator)
589 if new_collection_record is not None and "manifest_text" in new_collection_record:
590 _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
595 def collection_record(self):
597 return self.collection.api_response()
601 def __getitem__(self, item):
602 if item == '.arvados#collection':
603 if self.collection_record_file is None:
604 self.collection_record_file = FuncToJSONFile(
605 self.inode, self.collection_record)
606 self.inodes.add_entry(self.collection_record_file)
607 self.invalidate() # use lookup as a signal to force update
608 return self.collection_record_file
610 return super(CollectionDirectory, self).__getitem__(item)
612 def __contains__(self, k):
613 if k == '.arvados#collection':
616 return super(CollectionDirectory, self).__contains__(k)
618 def invalidate(self):
619 if self.collection_record_file is not None:
620 self.collection_record_file.invalidate()
621 self.inodes.invalidate_inode(self.collection_record_file)
622 super(CollectionDirectory, self).invalidate()
625 return (self.collection_locator is not None)
628 # This is an empirically-derived heuristic to estimate the memory used
629 # to store this collection's metadata. Calculating the memory
630 # footprint directly would be more accurate, but also more complicated.
631 return self._manifest_size * 128
634 if self.collection is not None:
636 self.collection.save()
637 self.collection.stop_threads()
640 if self.collection is not None:
641 self.collection.stop_threads()
642 super(CollectionDirectory, self).clear()
643 self._manifest_size = 0
646 class TmpCollectionDirectory(CollectionDirectoryBase):
647 """A directory backed by an Arvados collection that never gets saved.
649 This supports using Keep as scratch space. A userspace program can
650 read the .arvados#collection file to get a current manifest in
651 order to save a snapshot of the scratch data or use it as a crunch
655 class UnsaveableCollection(arvados.collection.Collection):
661 def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
662 collection = self.UnsaveableCollection(
663 api_client=api_client,
664 keep_client=api_client.keep,
665 num_retries=num_retries,
666 storage_classes_desired=storage_classes)
667 # This is always enable_write=True because it never tries to
668 # save to the backend
669 super(TmpCollectionDirectory, self).__init__(
670 parent_inode, inodes, api_client.config, True, filters, collection, self)
671 self.populate(self.mtime())
673 def on_event(self, *args, **kwargs):
674 super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
675 if self.collection_record_file is None:
678 # See discussion in CollectionDirectoryBase.on_event
682 self.collection.lock.release()
689 with self.collection.lock:
690 self.collection_record_file.invalidate()
691 self.inodes.invalidate_inode(self.collection_record_file)
692 _logger.debug("%s invalidated collection record", self)
695 self.collection.lock.acquire()
698 def collection_record(self):
699 with llfuse.lock_released:
702 "manifest_text": self.collection.manifest_text(),
703 "portable_data_hash": self.collection.portable_data_hash(),
704 "storage_classes_desired": self.collection.storage_classes_desired(),
707 def __contains__(self, k):
708 return (k == '.arvados#collection' or
709 super(TmpCollectionDirectory, self).__contains__(k))
712 def __getitem__(self, item):
713 if item == '.arvados#collection':
714 if self.collection_record_file is None:
715 self.collection_record_file = FuncToJSONFile(
716 self.inode, self.collection_record)
717 self.inodes.add_entry(self.collection_record_file)
718 return self.collection_record_file
719 return super(TmpCollectionDirectory, self).__getitem__(item)
730 def want_event_subscribe(self):
734 self.collection.stop_threads()
736 def invalidate(self):
737 if self.collection_record_file:
738 self.collection_record_file.invalidate()
739 super(TmpCollectionDirectory, self).invalidate()
742 class MagicDirectory(Directory):
743 """A special directory that logically contains the set of all extant keep locators.
745 When a file is referenced by lookup(), it is tested to see if it is a valid
746 keep locator to a manifest, and if so, loads the manifest contents as a
747 subdirectory of this directory with the locator as the directory name.
748 Since querying a list of all extant keep locators is impractical, only
749 collections that have already been accessed are visible to readdir().
754 This directory provides access to Arvados collections as subdirectories listed
755 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
756 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
757 (in the form 'zzzzz-j7d0g-1234567890abcde').
759 Note that this directory will appear empty until you attempt to access a
760 specific collection or project subdirectory (such as trying to 'cd' into it),
761 at which point the collection or project will actually be looked up on the server
762 and the directory will appear if it exists.
766 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
767 super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
769 self.num_retries = num_retries
770 self.pdh_only = pdh_only
771 self.storage_classes = storage_classes
773 def __setattr__(self, name, value):
774 super(MagicDirectory, self).__setattr__(name, value)
775 # When we're assigned an inode, add a README.
776 if ((name == 'inode') and (self.inode is not None) and
777 (not self._entries)):
778 self._entries['README'] = self.inodes.add_entry(
779 StringFile(self.inode, self.README_TEXT, time.time()))
780 # If we're the root directory, add an identical by_id subdirectory.
781 if self.inode == llfuse.ROOT_INODE:
782 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
792 def __contains__(self, k):
793 if k in self._entries:
796 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
802 if group_uuid_pattern.match(k):
803 project = self.api.groups().list(
805 ['group_class', 'in', ['project','filter']],
807 *self._filters_for('groups', qualified=False),
809 ).execute(num_retries=self.num_retries)
810 if project[u'items_available'] == 0:
812 e = self.inodes.add_entry(ProjectDirectory(
819 project[u'items'][0],
820 storage_classes=self.storage_classes,
823 e = self.inodes.add_entry(CollectionDirectory(
834 if k not in self._entries:
837 self.inodes.del_entry(e)
840 self.inodes.invalidate_entry(self, k)
841 self.inodes.del_entry(e)
843 except Exception as ex:
844 _logger.exception("arv-mount lookup '%s':", k)
846 self.inodes.del_entry(e)
849 def __getitem__(self, item):
851 return self._entries[item]
853 raise KeyError("No collection with id " + item)
858 def want_event_subscribe(self):
859 return not self.pdh_only
862 class TagsDirectory(Directory):
863 """A special directory that contains as subdirectories all tags visible to the user."""
865 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
866 super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
868 self.num_retries = num_retries
870 self._poll_time = poll_time
873 def want_event_subscribe(self):
878 with llfuse.lock_released:
879 tags = self.api.links().list(
881 ['link_class', '=', 'tag'],
883 *self._filters_for('links', qualified=False),
888 ).execute(num_retries=self.num_retries)
891 tags['items']+[{"name": n} for n in self._extra],
893 lambda a, i: a.tag == i['name'],
894 lambda i: TagDirectory(
903 poll_time=self._poll_time,
909 def __getitem__(self, item):
910 if super(TagsDirectory, self).__contains__(item):
911 return super(TagsDirectory, self).__getitem__(item)
912 with llfuse.lock_released:
913 tags = self.api.links().list(
915 ['link_class', '=', 'tag'],
917 *self._filters_for('links', qualified=False),
920 ).execute(num_retries=self.num_retries)
922 self._extra.add(item)
924 return super(TagsDirectory, self).__getitem__(item)
928 def __contains__(self, k):
929 if super(TagsDirectory, self).__contains__(k):
939 class TagDirectory(Directory):
940 """A special directory that contains as subdirectories all collections visible
941 to the user that are tagged with a particular tag.
944 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
945 poll=False, poll_time=60):
946 super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
948 self.num_retries = num_retries
951 self._poll_time = poll_time
953 def want_event_subscribe(self):
958 with llfuse.lock_released:
959 taggedcollections = self.api.links().list(
961 ['link_class', '=', 'tag'],
962 ['name', '=', self.tag],
963 ['head_uuid', 'is_a', 'arvados#collection'],
964 *self._filters_for('links', qualified=False),
966 select=['head_uuid'],
967 ).execute(num_retries=self.num_retries)
969 taggedcollections['items'],
970 lambda i: i['head_uuid'],
971 lambda a, i: a.collection_locator == i['head_uuid'],
972 lambda i: CollectionDirectory(
984 class ProjectDirectory(Directory):
985 """A special directory that contains the contents of a project."""
987 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
988 project_object, poll=True, poll_time=3, storage_classes=None):
989 super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
991 self.num_retries = num_retries
992 self.project_object = project_object
993 self.project_object_file = None
994 self.project_uuid = project_object['uuid']
996 self._poll_time = poll_time
997 self._updating_lock = threading.Lock()
998 self._current_user = None
999 self._full_listing = False
1000 self.storage_classes = storage_classes
1002 def want_event_subscribe(self):
1005 def createDirectory(self, i):
1006 common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
1007 if collection_uuid_pattern.match(i['uuid']):
1008 return CollectionDirectory(*common_args, i)
1009 elif group_uuid_pattern.match(i['uuid']):
1010 return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes)
1011 elif link_uuid_pattern.match(i['uuid']):
1012 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
1013 return CollectionDirectory(*common_args, i['head_uuid'])
1016 elif uuid_pattern.match(i['uuid']):
1017 return ObjectFile(self.parent_inode, i)
1022 return self.project_uuid
1025 self._full_listing = True
1026 return super(ProjectDirectory, self).items()
1028 def namefn(self, i):
1030 if i['name'] is None or len(i['name']) == 0:
1032 elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
1033 # collection or subproject
1035 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
1038 elif 'kind' in i and i['kind'].startswith('arvados#'):
1040 return "{}.{}".format(i['name'], i['kind'][8:])
1047 if self.project_object_file == None:
1048 self.project_object_file = ObjectFile(self.inode, self.project_object)
1049 self.inodes.add_entry(self.project_object_file)
1051 if not self._full_listing:
1055 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
1056 return a.uuid() == i['uuid']
1057 elif isinstance(a, ObjectFile):
1058 return a.uuid() == i['uuid'] and not a.stale()
1062 with llfuse.lock_released:
1063 self._updating_lock.acquire()
1064 if not self.stale():
1067 if group_uuid_pattern.match(self.project_uuid):
1068 self.project_object = self.api.groups().get(
1069 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1070 elif user_uuid_pattern.match(self.project_uuid):
1071 self.project_object = self.api.users().get(
1072 uuid=self.project_uuid).execute(num_retries=self.num_retries)
1073 # do this in 2 steps until #17424 is fixed
1074 contents = list(arvados.util.keyset_list_all(
1075 self.api.groups().contents,
1077 num_retries=self.num_retries,
1078 uuid=self.project_uuid,
1080 ['uuid', 'is_a', 'arvados#group'],
1081 ['groups.group_class', 'in', ['project', 'filter']],
1082 *self._filters_for('groups', qualified=True),
1085 contents.extend(obj for obj in arvados.util.keyset_list_all(
1086 self.api.groups().contents,
1088 num_retries=self.num_retries,
1089 uuid=self.project_uuid,
1091 ['uuid', 'is_a', 'arvados#collection'],
1092 *self._filters_for('collections', qualified=True),
1094 ) if obj['current_version_uuid'] == obj['uuid'])
1096 # end with llfuse.lock_released, re-acquire lock
1098 self.merge(contents,
1101 self.createDirectory)
1104 self._updating_lock.release()
1106 def _add_entry(self, i, name):
1107 ent = self.createDirectory(i)
1108 self._entries[name] = self.inodes.add_entry(ent)
1109 return self._entries[name]
1113 def __getitem__(self, k):
1114 if k == '.arvados#project':
1115 return self.project_object_file
1116 elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
1117 return super(ProjectDirectory, self).__getitem__(k)
1118 with llfuse.lock_released:
1119 k2 = self.unsanitize_filename(k)
1121 namefilter = ["name", "=", k]
1123 namefilter = ["name", "in", [k, k2]]
1124 contents = self.api.groups().list(
1126 ["owner_uuid", "=", self.project_uuid],
1127 ["group_class", "in", ["project","filter"]],
1129 *self._filters_for('groups', qualified=False),
1132 ).execute(num_retries=self.num_retries)["items"]
1134 contents = self.api.collections().list(
1136 ["owner_uuid", "=", self.project_uuid],
1138 *self._filters_for('collections', qualified=False),
1141 ).execute(num_retries=self.num_retries)["items"]
1143 if len(contents) > 1 and contents[1]['name'] == k:
1144 # If "foo/bar" and "foo[SUBST]bar" both exist, use
1146 contents = [contents[1]]
1147 name = self.sanitize_filename(self.namefn(contents[0]))
1150 return self._add_entry(contents[0], name)
1155 def __contains__(self, k):
1156 if k == '.arvados#project':
1168 if not self._enable_write:
1170 with llfuse.lock_released:
1171 if not self._current_user:
1172 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
1173 return self._current_user["uuid"] in self.project_object.get("writable_by", [])
1175 def persisted(self):
1180 def mkdir(self, name):
1181 if not self.writable():
1182 raise llfuse.FUSEError(errno.EROFS)
1185 with llfuse.lock_released:
1187 "owner_uuid": self.project_uuid,
1189 "manifest_text": "" }
1190 if self.storage_classes is not None:
1191 c["storage_classes_desired"] = self.storage_classes
1193 self.api.collections().create(body=c).execute(num_retries=self.num_retries)
1194 except Exception as e:
1197 except apiclient_errors.Error as error:
1198 _logger.error(error)
1199 raise llfuse.FUSEError(errno.EEXIST)
1203 def rmdir(self, name):
1204 if not self.writable():
1205 raise llfuse.FUSEError(errno.EROFS)
1207 if name not in self:
1208 raise llfuse.FUSEError(errno.ENOENT)
1209 if not isinstance(self[name], CollectionDirectory):
1210 raise llfuse.FUSEError(errno.EPERM)
1211 if len(self[name]) > 0:
1212 raise llfuse.FUSEError(errno.ENOTEMPTY)
1213 with llfuse.lock_released:
1214 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1219 def rename(self, name_old, name_new, src):
1220 if not self.writable():
1221 raise llfuse.FUSEError(errno.EROFS)
1223 if not isinstance(src, ProjectDirectory):
1224 raise llfuse.FUSEError(errno.EPERM)
1228 if not isinstance(ent, CollectionDirectory):
1229 raise llfuse.FUSEError(errno.EPERM)
1231 if name_new in self:
1232 # POSIX semantics for replacing one directory with another is
1233 # tricky (the target directory must be empty, the operation must be
1234 # atomic which isn't possible with the Arvados API as of this
1235 # writing) so don't support that.
1236 raise llfuse.FUSEError(errno.EPERM)
1238 self.api.collections().update(uuid=ent.uuid(),
1239 body={"owner_uuid": self.uuid(),
1240 "name": name_new}).execute(num_retries=self.num_retries)
1242 # Acually move the entry from source directory to this directory.
1243 del src._entries[name_old]
1244 self._entries[name_new] = ent
1245 self.inodes.invalidate_entry(src, name_old)
1248 def child_event(self, ev):
1249 properties = ev.get("properties") or {}
1250 old_attrs = properties.get("old_attributes") or {}
1251 new_attrs = properties.get("new_attributes") or {}
1252 old_attrs["uuid"] = ev["object_uuid"]
1253 new_attrs["uuid"] = ev["object_uuid"]
1254 old_name = self.sanitize_filename(self.namefn(old_attrs))
1255 new_name = self.sanitize_filename(self.namefn(new_attrs))
1257 # create events will have a new name, but not an old name
1258 # delete events will have an old name, but not a new name
1259 # update events will have an old and new name, and they may be same or different
1260 # if they are the same, an unrelated field changed and there is nothing to do.
1262 if old_attrs.get("owner_uuid") != self.project_uuid:
1263 # Was moved from somewhere else, so don't try to remove entry.
1265 if ev.get("object_owner_uuid") != self.project_uuid:
1266 # Was moved to somewhere else, so don't try to add entry
1269 if old_attrs.get("is_trashed"):
1270 # Was previously deleted
1272 if new_attrs.get("is_trashed"):
1276 if new_name != old_name:
1278 if old_name in self._entries:
1279 ent = self._entries[old_name]
1280 del self._entries[old_name]
1281 self.inodes.invalidate_entry(self, old_name)
1285 self._entries[new_name] = ent
1287 self._add_entry(new_attrs, new_name)
1288 elif ent is not None:
1289 self.inodes.del_entry(ent)
1292 class SharedDirectory(Directory):
1293 """A special directory that represents users or groups who have shared projects with me."""
1295 def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
1296 exclude, poll=False, poll_time=60, storage_classes=None):
1297 super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
1299 self.num_retries = num_retries
1300 self.current_user = api.users().current().execute(num_retries=num_retries)
1302 self._poll_time = poll_time
1303 self._updating_lock = threading.Lock()
1304 self.storage_classes = storage_classes
1309 with llfuse.lock_released:
1310 self._updating_lock.acquire()
1311 if not self.stale():
1319 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1320 if 'httpMethod' in methods.get('shared', {}):
1323 resp = self.api.groups().shared(
1325 ['group_class', 'in', ['project','filter']],
1327 *self._filters_for('groups', qualified=False),
1332 include="owner_uuid",
1334 if not resp["items"]:
1336 page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1337 for r in resp["items"]:
1338 objects[r["uuid"]] = r
1339 roots.append(r["uuid"])
1340 for r in resp["included"]:
1341 objects[r["uuid"]] = r
1342 root_owners.add(r["uuid"])
1344 all_projects = list(arvados.util.keyset_list_all(
1345 self.api.groups().list,
1347 num_retries=self.num_retries,
1349 ['group_class', 'in', ['project','filter']],
1350 *self._filters_for('groups', qualified=False),
1352 select=["uuid", "owner_uuid"],
1354 for ob in all_projects:
1355 objects[ob['uuid']] = ob
1357 current_uuid = self.current_user['uuid']
1358 for ob in all_projects:
1359 if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1360 roots.append(ob['uuid'])
1361 root_owners.add(ob['owner_uuid'])
1363 lusers = arvados.util.keyset_list_all(
1364 self.api.users().list,
1366 num_retries=self.num_retries,
1368 ['uuid', 'in', list(root_owners)],
1369 *self._filters_for('users', qualified=False),
1372 lgroups = arvados.util.keyset_list_all(
1373 self.api.groups().list,
1375 num_retries=self.num_retries,
1377 ['uuid', 'in', list(root_owners)+roots],
1378 *self._filters_for('groups', qualified=False),
1382 objects[l["uuid"]] = l
1384 objects[l["uuid"]] = l
1386 for r in root_owners:
1390 contents[obr["name"]] = obr
1391 elif "first_name" in obr:
1392 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1397 if obr['owner_uuid'] not in objects:
1398 contents[obr["name"]] = obr
1400 # end with llfuse.lock_released, re-acquire lock
1405 lambda a, i: a.uuid() == i[1]['uuid'],
1406 lambda i: ProjectDirectory(
1415 poll_time=self._poll_time,
1416 storage_classes=self.storage_classes,
1420 _logger.exception("arv-mount shared dir error")
1422 self._updating_lock.release()
1424 def want_event_subscribe(self):