9 from apiclient import errors as apiclient_errors
12 from fusefile import StringFile, ObjectFile, FuseArvadosFile
13 from fresh import FreshBase, convertTime, use_counter, check_update
15 import arvados.collection
16 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
18 _logger = logging.getLogger('arvados.arvados_fuse')
21 # Match any character which FUSE or Linux cannot accommodate as part
22 # of a filename. (If present in a collection filename, they will
23 # appear as underscores in the fuse mount.)
24 _disallowed_filename_characters = re.compile('[\x00/]')
26 # '.' and '..' are not reachable if API server is newer than #6277
27 def sanitize_filename(dirty):
28 """Replace disallowed filename characters with harmless "_"."""
38 return _disallowed_filename_characters.sub('_', dirty)
41 class Directory(FreshBase):
42 """Generic directory object, backed by a dict.
44 Consists of a set of entries with the key representing the filename
45 and the value referencing a File or Directory object.
48 def __init__(self, parent_inode, inodes):
49 """parent_inode is the integer inode number"""
51 super(Directory, self).__init__()
54 if not isinstance(parent_inode, int):
55 raise Exception("parent_inode should be an int")
56 self.parent_inode = parent_inode
59 self._mtime = time.time()
61 # Overriden by subclasses to implement logic to update the entries dict
62 # when the directory is stale
67 # Only used when computing the size of the disk footprint of the directory
75 def checkupdate(self):
79 except apiclient.errors.HttpError as e:
84 def __getitem__(self, item):
85 return self._entries[item]
90 return list(self._entries.items())
94 def __contains__(self, k):
95 return k in self._entries
100 return len(self._entries)
103 self.inodes.touch(self)
104 super(Directory, self).fresh()
106 def merge(self, items, fn, same, new_entry):
107 """Helper method for updating the contents of the directory.
109 Takes a list describing the new contents of the directory, reuse
110 entries that are the same in both the old and new lists, create new
111 entries, and delete old entries missing from the new list.
113 :items: iterable with new directory contents
115 :fn: function to take an entry in 'items' and return the desired file or
116 directory name, or None if this entry should be skipped
118 :same: function to compare an existing entry (a File or Directory
119 object) with an entry in the items list to determine whether to keep
122 :new_entry: function to create a new directory entry (File or Directory
123 object) from an entry in the items list.
127 oldentries = self._entries
131 name = sanitize_filename(fn(i))
133 if name in oldentries and same(oldentries[name], i):
134 # move existing directory entry over
135 self._entries[name] = oldentries[name]
138 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
139 # create new directory entry
142 self._entries[name] = self.inodes.add_entry(ent)
145 # delete any other directory entries that were not in found in 'items'
147 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
148 self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
149 self.inodes.del_entry(oldentries[i])
153 self.inodes.invalidate_inode(self.inode)
154 self._mtime = time.time()
158 def clear(self, force=False):
159 """Delete all entries"""
161 if not self.in_use() or force:
162 oldentries = self._entries
165 if not oldentries[n].clear(force):
166 self._entries = oldentries
169 self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
170 self.inodes.del_entry(oldentries[n])
171 self.inodes.invalidate_inode(self.inode)
186 def create(self, name):
187 raise NotImplementedError()
189 def mkdir(self, name):
190 raise NotImplementedError()
192 def unlink(self, name):
193 raise NotImplementedError()
195 def rmdir(self, name):
196 raise NotImplementedError()
198 def rename(self, name_old, name_new, src):
199 raise NotImplementedError()
202 class CollectionDirectoryBase(Directory):
203 """Represent an Arvados Collection as a directory.
205 This class is used for Subcollections, and is also the base class for
206 CollectionDirectory, which implements collection loading/saving on
209 Most operations act only the underlying Arvados `Collection` object. The
210 `Collection` object signals via a notify callback to
211 `CollectionDirectoryBase.on_event` that an item was added, removed or
212 modified. FUSE inodes and directory entries are created, deleted or
213 invalidated in response to these events.
217 def __init__(self, parent_inode, inodes, collection):
218 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
219 self.collection = collection
221 def new_entry(self, name, item, mtime):
222 name = sanitize_filename(name)
223 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
224 if item.fuse_entry.dead is not True:
225 raise Exception("Can only reparent dead inode entry")
226 if item.fuse_entry.inode is None:
227 raise Exception("Reparented entry must still have valid inode")
228 item.fuse_entry.dead = False
229 self._entries[name] = item.fuse_entry
230 elif isinstance(item, arvados.collection.RichCollectionBase):
231 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
232 self._entries[name].populate(mtime)
234 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
235 item.fuse_entry = self._entries[name]
237 def on_event(self, event, collection, name, item):
238 if collection == self.collection:
239 name = sanitize_filename(name)
240 _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
242 if event == arvados.collection.ADD:
243 self.new_entry(name, item, self.mtime())
244 elif event == arvados.collection.DEL:
245 ent = self._entries[name]
246 del self._entries[name]
247 self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
248 self.inodes.del_entry(ent)
249 elif event == arvados.collection.MOD:
250 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
251 self.inodes.invalidate_inode(item.fuse_entry.inode)
252 elif name in self._entries:
253 self.inodes.invalidate_inode(self._entries[name].inode)
255 def populate(self, mtime):
257 self.collection.subscribe(self.on_event)
258 for entry, item in self.collection.items():
259 self.new_entry(entry, item, self.mtime())
262 return self.collection.writable()
266 with llfuse.lock_released:
267 self.collection.root_collection().save()
271 def create(self, name):
272 with llfuse.lock_released:
273 self.collection.open(name, "w").close()
277 def mkdir(self, name):
278 with llfuse.lock_released:
279 self.collection.mkdirs(name)
283 def unlink(self, name):
284 with llfuse.lock_released:
285 self.collection.remove(name)
290 def rmdir(self, name):
291 with llfuse.lock_released:
292 self.collection.remove(name)
297 def rename(self, name_old, name_new, src):
298 if not isinstance(src, CollectionDirectoryBase):
299 raise llfuse.FUSEError(errno.EPERM)
304 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
306 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
308 raise llfuse.FUSEError(errno.ENOTEMPTY)
309 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
310 raise llfuse.FUSEError(errno.ENOTDIR)
311 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
312 raise llfuse.FUSEError(errno.EISDIR)
314 with llfuse.lock_released:
315 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
320 class CollectionDirectory(CollectionDirectoryBase):
321 """Represents the root of a directory tree representing a collection."""
323 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
324 super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
326 self.num_retries = num_retries
327 self.collection_record_file = None
328 self.collection_record = None
329 if isinstance(collection_record, dict):
330 self.collection_locator = collection_record['uuid']
331 self._mtime = convertTime(collection_record.get('modified_at'))
333 self.collection_locator = collection_record
335 self._manifest_size = 0
336 if self.collection_locator:
337 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
338 self._updating_lock = threading.Lock()
341 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
344 return self.collection.writable() if self.collection is not None else self._writable
346 # Used by arv-web.py to switch the contents of the CollectionDirectory
347 def change_collection(self, new_locator):
348 """Switch the contents of the CollectionDirectory.
350 Must be called with llfuse.lock held.
353 self.collection_locator = new_locator
354 self.collection_record = None
357 def new_collection(self, new_collection_record, coll_reader):
359 self.clear(force=True)
361 self.collection_record = new_collection_record
363 if self.collection_record:
364 self._mtime = convertTime(self.collection_record.get('modified_at'))
365 self.collection_locator = self.collection_record["uuid"]
366 if self.collection_record_file is not None:
367 self.collection_record_file.update(self.collection_record)
369 self.collection = coll_reader
370 self.populate(self.mtime())
373 return self.collection_locator
376 def update(self, to_record_version=None):
378 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
381 if self.collection_locator is None:
386 with llfuse.lock_released:
387 self._updating_lock.acquire()
391 _logger.debug("Updating %s", to_record_version)
392 if self.collection is not None:
393 if self.collection.known_past_version(to_record_version):
394 _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
396 self.collection.update()
398 if uuid_pattern.match(self.collection_locator):
399 coll_reader = arvados.collection.Collection(
400 self.collection_locator, self.api, self.api.keep,
401 num_retries=self.num_retries)
403 coll_reader = arvados.collection.CollectionReader(
404 self.collection_locator, self.api, self.api.keep,
405 num_retries=self.num_retries)
406 new_collection_record = coll_reader.api_response() or {}
407 # If the Collection only exists in Keep, there will be no API
408 # response. Fill in the fields we need.
409 if 'uuid' not in new_collection_record:
410 new_collection_record['uuid'] = self.collection_locator
411 if "portable_data_hash" not in new_collection_record:
412 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
413 if 'manifest_text' not in new_collection_record:
414 new_collection_record['manifest_text'] = coll_reader.manifest_text()
416 if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
417 self.new_collection(new_collection_record, coll_reader)
419 self._manifest_size = len(coll_reader.manifest_text())
420 _logger.debug("%s manifest_size %i", self, self._manifest_size)
421 # end with llfuse.lock_released, re-acquire lock
426 self._updating_lock.release()
427 except arvados.errors.NotFoundError as e:
428 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
429 except arvados.errors.ArgumentError as detail:
430 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
431 if self.collection_record is not None and "manifest_text" in self.collection_record:
432 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
434 _logger.exception("arv-mount %s: error", self.collection_locator)
435 if self.collection_record is not None and "manifest_text" in self.collection_record:
436 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
441 def __getitem__(self, item):
442 if item == '.arvados#collection':
443 if self.collection_record_file is None:
444 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
445 self.inodes.add_entry(self.collection_record_file)
446 return self.collection_record_file
448 return super(CollectionDirectory, self).__getitem__(item)
450 def __contains__(self, k):
451 if k == '.arvados#collection':
454 return super(CollectionDirectory, self).__contains__(k)
456 def invalidate(self):
457 self.collection_record = None
458 self.collection_record_file = None
459 super(CollectionDirectory, self).invalidate()
462 return (self.collection_locator is not None)
465 # This is an empirically-derived heuristic to estimate the memory used
466 # to store this collection's metadata. Calculating the memory
467 # footprint directly would be more accurate, but also more complicated.
468 return self._manifest_size * 128
471 if self.collection is not None:
473 self.collection.save()
474 self.collection.stop_threads()
477 class MagicDirectory(Directory):
478 """A special directory that logically contains the set of all extant keep locators.
480 When a file is referenced by lookup(), it is tested to see if it is a valid
481 keep locator to a manifest, and if so, loads the manifest contents as a
482 subdirectory of this directory with the locator as the directory name.
483 Since querying a list of all extant keep locators is impractical, only
484 collections that have already been accessed are visible to readdir().
489 This directory provides access to Arvados collections as subdirectories listed
490 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
491 the form '1234567890abcdefghijklmnopqrstuv+123').
493 Note that this directory will appear empty until you attempt to access a
494 specific collection subdirectory (such as trying to 'cd' into it), at which
495 point the collection will actually be looked up on the server and the directory
496 will appear if it exists.
499 def __init__(self, parent_inode, inodes, api, num_retries, by_pdh=False):
500 super(MagicDirectory, self).__init__(parent_inode, inodes)
502 self.num_retries = num_retries
505 def __setattr__(self, name, value):
506 super(MagicDirectory, self).__setattr__(name, value)
507 # When we're assigned an inode, add a README.
508 if ((name == 'inode') and (self.inode is not None) and
509 (not self._entries)):
510 self._entries['README'] = self.inodes.add_entry(
511 StringFile(self.inode, self.README_TEXT, time.time()))
512 # If we're the root directory, add an identical by_id subdirectory.
513 if self.inode == llfuse.ROOT_INODE:
514 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
515 self.inode, self.inodes, self.api, self.num_retries))
517 def __contains__(self, k):
518 if k in self._entries:
521 if self.by_pdh and uuid_pattern.match(k):
522 raise llfuse.FUSEError(errno.ENOENT)
524 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
528 e = self.inodes.add_entry(CollectionDirectory(
529 self.inode, self.inodes, self.api, self.num_retries, k))
532 if k not in self._entries:
535 self.inodes.del_entry(e)
538 self.inodes.del_entry(e)
540 except Exception as e:
541 _logger.debug('arv-mount exception keep %s', e)
542 self.inodes.del_entry(e)
545 def __getitem__(self, item):
547 return self._entries[item]
549 raise KeyError("No collection with id " + item)
551 def clear(self, force=False):
555 class RecursiveInvalidateDirectory(Directory):
556 def invalidate(self):
558 super(RecursiveInvalidateDirectory, self).invalidate()
559 for a in self._entries:
560 self._entries[a].invalidate()
565 class TagsDirectory(RecursiveInvalidateDirectory):
566 """A special directory that contains as subdirectories all tags visible to the user."""
568 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
569 super(TagsDirectory, self).__init__(parent_inode, inodes)
571 self.num_retries = num_retries
573 self._poll_time = poll_time
577 with llfuse.lock_released:
578 tags = self.api.links().list(
579 filters=[['link_class', '=', 'tag']],
580 select=['name'], distinct=True
581 ).execute(num_retries=self.num_retries)
583 self.merge(tags['items'],
585 lambda a, i: a.tag == i['name'],
586 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
589 class TagDirectory(Directory):
590 """A special directory that contains as subdirectories all collections visible
591 to the user that are tagged with a particular tag.
594 def __init__(self, parent_inode, inodes, api, num_retries, tag,
595 poll=False, poll_time=60):
596 super(TagDirectory, self).__init__(parent_inode, inodes)
598 self.num_retries = num_retries
601 self._poll_time = poll_time
605 with llfuse.lock_released:
606 taggedcollections = self.api.links().list(
607 filters=[['link_class', '=', 'tag'],
608 ['name', '=', self.tag],
609 ['head_uuid', 'is_a', 'arvados#collection']],
611 ).execute(num_retries=self.num_retries)
612 self.merge(taggedcollections['items'],
613 lambda i: i['head_uuid'],
614 lambda a, i: a.collection_locator == i['head_uuid'],
615 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
618 class ProjectDirectory(Directory):
619 """A special directory that contains the contents of a project."""
621 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
622 poll=False, poll_time=60):
623 super(ProjectDirectory, self).__init__(parent_inode, inodes)
625 self.num_retries = num_retries
626 self.project_object = project_object
627 self.project_object_file = None
628 self.project_uuid = project_object['uuid']
630 self._poll_time = poll_time
631 self._updating_lock = threading.Lock()
632 self._current_user = None
634 def createDirectory(self, i):
635 if collection_uuid_pattern.match(i['uuid']):
636 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
637 elif group_uuid_pattern.match(i['uuid']):
638 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
639 elif link_uuid_pattern.match(i['uuid']):
640 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
641 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
644 elif uuid_pattern.match(i['uuid']):
645 return ObjectFile(self.parent_inode, i)
650 return self.project_uuid
654 if self.project_object_file == None:
655 self.project_object_file = ObjectFile(self.inode, self.project_object)
656 self.inodes.add_entry(self.project_object_file)
660 if i['name'] is None or len(i['name']) == 0:
662 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
663 # collection or subproject
665 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
668 elif 'kind' in i and i['kind'].startswith('arvados#'):
670 return "{}.{}".format(i['name'], i['kind'][8:])
675 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
676 return a.uuid() == i['uuid']
677 elif isinstance(a, ObjectFile):
678 return a.uuid() == i['uuid'] and not a.stale()
682 with llfuse.lock_released:
683 self._updating_lock.acquire()
687 if group_uuid_pattern.match(self.project_uuid):
688 self.project_object = self.api.groups().get(
689 uuid=self.project_uuid).execute(num_retries=self.num_retries)
690 elif user_uuid_pattern.match(self.project_uuid):
691 self.project_object = self.api.users().get(
692 uuid=self.project_uuid).execute(num_retries=self.num_retries)
694 contents = arvados.util.list_all(self.api.groups().contents,
695 self.num_retries, uuid=self.project_uuid)
697 # end with llfuse.lock_released, re-acquire lock
702 self.createDirectory)
704 self._updating_lock.release()
708 def __getitem__(self, item):
709 if item == '.arvados#project':
710 return self.project_object_file
712 return super(ProjectDirectory, self).__getitem__(item)
714 def __contains__(self, k):
715 if k == '.arvados#project':
718 return super(ProjectDirectory, self).__contains__(k)
723 with llfuse.lock_released:
724 if not self._current_user:
725 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
726 return self._current_user["uuid"] in self.project_object["writable_by"]
733 def mkdir(self, name):
735 with llfuse.lock_released:
736 self.api.collections().create(body={"owner_uuid": self.project_uuid,
738 "manifest_text": ""}).execute(num_retries=self.num_retries)
740 except apiclient_errors.Error as error:
742 raise llfuse.FUSEError(errno.EEXIST)
746 def rmdir(self, name):
748 raise llfuse.FUSEError(errno.ENOENT)
749 if not isinstance(self[name], CollectionDirectory):
750 raise llfuse.FUSEError(errno.EPERM)
751 if len(self[name]) > 0:
752 raise llfuse.FUSEError(errno.ENOTEMPTY)
753 with llfuse.lock_released:
754 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
759 def rename(self, name_old, name_new, src):
760 if not isinstance(src, ProjectDirectory):
761 raise llfuse.FUSEError(errno.EPERM)
765 if not isinstance(ent, CollectionDirectory):
766 raise llfuse.FUSEError(errno.EPERM)
769 # POSIX semantics for replacing one directory with another is
770 # tricky (the target directory must be empty, the operation must be
771 # atomic which isn't possible with the Arvados API as of this
772 # writing) so don't support that.
773 raise llfuse.FUSEError(errno.EPERM)
775 self.api.collections().update(uuid=ent.uuid(),
776 body={"owner_uuid": self.uuid(),
777 "name": name_new}).execute(num_retries=self.num_retries)
779 # Acually move the entry from source directory to this directory.
780 del src._entries[name_old]
781 self._entries[name_new] = ent
782 self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
785 class SharedDirectory(Directory):
786 """A special directory that represents users or groups who have shared projects with me."""
788 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
789 poll=False, poll_time=60):
790 super(SharedDirectory, self).__init__(parent_inode, inodes)
792 self.num_retries = num_retries
793 self.current_user = api.users().current().execute(num_retries=num_retries)
795 self._poll_time = poll_time
799 with llfuse.lock_released:
800 all_projects = arvados.util.list_all(
801 self.api.groups().list, self.num_retries,
802 filters=[['group_class','=','project']])
804 for ob in all_projects:
805 objects[ob['uuid']] = ob
809 for ob in all_projects:
810 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
812 root_owners[ob['owner_uuid']] = True
814 lusers = arvados.util.list_all(
815 self.api.users().list, self.num_retries,
816 filters=[['uuid','in', list(root_owners)]])
817 lgroups = arvados.util.list_all(
818 self.api.groups().list, self.num_retries,
819 filters=[['uuid','in', list(root_owners)]])
825 objects[l["uuid"]] = l
827 objects[l["uuid"]] = l
830 for r in root_owners:
834 contents[obr["name"]] = obr
835 #elif obr.get("username"):
836 # contents[obr["username"]] = obr
837 elif "first_name" in obr:
838 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
842 if r['owner_uuid'] not in objects:
843 contents[r['name']] = r
845 # end with llfuse.lock_released, re-acquire lock
848 self.merge(contents.items(),
850 lambda a, i: a.uuid() == i[1]['uuid'],
851 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))