9 from apiclient import errors as apiclient_errors
12 from fusefile import StringFile, ObjectFile, FuseArvadosFile
13 from fresh import FreshBase, convertTime, use_counter, check_update
15 import arvados.collection
16 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
18 _logger = logging.getLogger('arvados.arvados_fuse')
21 # Match any character which FUSE or Linux cannot accommodate as part
22 # of a filename. (If present in a collection filename, they will
23 # appear as underscores in the fuse mount.)
24 _disallowed_filename_characters = re.compile('[\x00/]')
26 # '.' and '..' are not reachable if API server is newer than #6277
27 def sanitize_filename(dirty):
28 """Replace disallowed filename characters with harmless "_"."""
38 return _disallowed_filename_characters.sub('_', dirty)
41 class Directory(FreshBase):
42 """Generic directory object, backed by a dict.
44 Consists of a set of entries with the key representing the filename
45 and the value referencing a File or Directory object.
48 def __init__(self, parent_inode, inodes):
49 """parent_inode is the integer inode number"""
51 super(Directory, self).__init__()
54 if not isinstance(parent_inode, int):
55 raise Exception("parent_inode should be an int")
56 self.parent_inode = parent_inode
59 self._mtime = time.time()
61 # Overriden by subclasses to implement logic to update the entries dict
62 # when the directory is stale
67 # Only used when computing the size of the disk footprint of the directory
75 def checkupdate(self):
79 except apiclient.errors.HttpError as e:
84 def __getitem__(self, item):
85 return self._entries[item]
90 return list(self._entries.items())
94 def __contains__(self, k):
95 return k in self._entries
100 return len(self._entries)
103 self.inodes.touch(self)
104 super(Directory, self).fresh()
106 def merge(self, items, fn, same, new_entry):
107 """Helper method for updating the contents of the directory.
109 Takes a list describing the new contents of the directory, reuse
110 entries that are the same in both the old and new lists, create new
111 entries, and delete old entries missing from the new list.
113 :items: iterable with new directory contents
115 :fn: function to take an entry in 'items' and return the desired file or
116 directory name, or None if this entry should be skipped
118 :same: function to compare an existing entry (a File or Directory
119 object) with an entry in the items list to determine whether to keep
122 :new_entry: function to create a new directory entry (File or Directory
123 object) from an entry in the items list.
127 oldentries = self._entries
131 name = sanitize_filename(fn(i))
133 if name in oldentries and same(oldentries[name], i):
134 # move existing directory entry over
135 self._entries[name] = oldentries[name]
138 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
139 # create new directory entry
142 self._entries[name] = self.inodes.add_entry(ent)
145 # delete any other directory entries that were not in found in 'items'
147 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
148 self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
149 self.inodes.del_entry(oldentries[i])
153 self.inodes.invalidate_inode(self.inode)
154 self._mtime = time.time()
158 def clear(self, force=False):
159 """Delete all entries"""
161 if not self.in_use() or force:
162 oldentries = self._entries
165 if not oldentries[n].clear(force):
166 self._entries = oldentries
169 self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
170 self.inodes.del_entry(oldentries[n])
171 self.inodes.invalidate_inode(self.inode)
186 def create(self, name):
187 raise NotImplementedError()
189 def mkdir(self, name):
190 raise NotImplementedError()
192 def unlink(self, name):
193 raise NotImplementedError()
195 def rmdir(self, name):
196 raise NotImplementedError()
198 def rename(self, name_old, name_new, src):
199 raise NotImplementedError()
202 class CollectionDirectoryBase(Directory):
203 """Represent an Arvados Collection as a directory.
205 This class is used for Subcollections, and is also the base class for
206 CollectionDirectory, which implements collection loading/saving on
209 Most operations act only the underlying Arvados `Collection` object. The
210 `Collection` object signals via a notify callback to
211 `CollectionDirectoryBase.on_event` that an item was added, removed or
212 modified. FUSE inodes and directory entries are created, deleted or
213 invalidated in response to these events.
217 def __init__(self, parent_inode, inodes, collection):
218 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
219 self.collection = collection
221 def new_entry(self, name, item, mtime):
222 name = sanitize_filename(name)
223 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
224 if item.fuse_entry.dead is not True:
225 raise Exception("Can only reparent dead inode entry")
226 if item.fuse_entry.inode is None:
227 raise Exception("Reparented entry must still have valid inode")
228 item.fuse_entry.dead = False
229 self._entries[name] = item.fuse_entry
230 elif isinstance(item, arvados.collection.RichCollectionBase):
231 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
232 self._entries[name].populate(mtime)
234 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
235 item.fuse_entry = self._entries[name]
237 def on_event(self, event, collection, name, item):
238 if collection == self.collection:
239 name = sanitize_filename(name)
240 _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
242 if event == arvados.collection.ADD:
243 self.new_entry(name, item, self.mtime())
244 elif event == arvados.collection.DEL:
245 ent = self._entries[name]
246 del self._entries[name]
247 self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
248 self.inodes.del_entry(ent)
249 elif event == arvados.collection.MOD:
250 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
251 self.inodes.invalidate_inode(item.fuse_entry.inode)
252 elif name in self._entries:
253 self.inodes.invalidate_inode(self._entries[name].inode)
255 def populate(self, mtime):
257 self.collection.subscribe(self.on_event)
258 for entry, item in self.collection.items():
259 self.new_entry(entry, item, self.mtime())
262 return self.collection.writable()
266 with llfuse.lock_released:
267 self.collection.root_collection().save()
271 def create(self, name):
272 with llfuse.lock_released:
273 self.collection.open(name, "w").close()
277 def mkdir(self, name):
278 with llfuse.lock_released:
279 self.collection.mkdirs(name)
283 def unlink(self, name):
284 with llfuse.lock_released:
285 self.collection.remove(name)
290 def rmdir(self, name):
291 with llfuse.lock_released:
292 self.collection.remove(name)
297 def rename(self, name_old, name_new, src):
298 if not isinstance(src, CollectionDirectoryBase):
299 raise llfuse.FUSEError(errno.EPERM)
304 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
306 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
308 raise llfuse.FUSEError(errno.ENOTEMPTY)
309 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
310 raise llfuse.FUSEError(errno.ENOTDIR)
311 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
312 raise llfuse.FUSEError(errno.EISDIR)
314 with llfuse.lock_released:
315 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
320 class CollectionDirectory(CollectionDirectoryBase):
321 """Represents the root of a directory tree representing a collection."""
323 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
324 super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
326 self.num_retries = num_retries
327 self.collection_record_file = None
328 self.collection_record = None
329 if isinstance(collection_record, dict):
330 self.collection_locator = collection_record['uuid']
331 self._mtime = convertTime(collection_record.get('modified_at'))
333 self.collection_locator = collection_record
335 self._manifest_size = 0
336 if self.collection_locator:
337 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
338 self._updating_lock = threading.Lock()
341 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
344 return self.collection.writable() if self.collection is not None else self._writable
346 # Used by arv-web.py to switch the contents of the CollectionDirectory
347 def change_collection(self, new_locator):
348 """Switch the contents of the CollectionDirectory.
350 Must be called with llfuse.lock held.
353 self.collection_locator = new_locator
354 self.collection_record = None
357 def new_collection(self, new_collection_record, coll_reader):
359 self.clear(force=True)
361 self.collection_record = new_collection_record
363 if self.collection_record:
364 self._mtime = convertTime(self.collection_record.get('modified_at'))
365 self.collection_locator = self.collection_record["uuid"]
366 if self.collection_record_file is not None:
367 self.collection_record_file.update(self.collection_record)
369 self.collection = coll_reader
370 self.populate(self.mtime())
373 return self.collection_locator
376 def update(self, to_record_version=None):
378 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
381 if self.collection_locator is None:
386 with llfuse.lock_released:
387 self._updating_lock.acquire()
391 _logger.debug("Updating %s", to_record_version)
392 if self.collection is not None:
393 if self.collection.known_past_version(to_record_version):
394 _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
396 self.collection.update()
398 if uuid_pattern.match(self.collection_locator):
399 coll_reader = arvados.collection.Collection(
400 self.collection_locator, self.api, self.api.keep,
401 num_retries=self.num_retries)
403 coll_reader = arvados.collection.CollectionReader(
404 self.collection_locator, self.api, self.api.keep,
405 num_retries=self.num_retries)
406 new_collection_record = coll_reader.api_response() or {}
407 # If the Collection only exists in Keep, there will be no API
408 # response. Fill in the fields we need.
409 if 'uuid' not in new_collection_record:
410 new_collection_record['uuid'] = self.collection_locator
411 if "portable_data_hash" not in new_collection_record:
412 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
413 if 'manifest_text' not in new_collection_record:
414 new_collection_record['manifest_text'] = coll_reader.manifest_text()
416 if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
417 self.new_collection(new_collection_record, coll_reader)
419 self._manifest_size = len(coll_reader.manifest_text())
420 _logger.debug("%s manifest_size %i", self, self._manifest_size)
421 # end with llfuse.lock_released, re-acquire lock
426 self._updating_lock.release()
427 except arvados.errors.NotFoundError as e:
428 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
429 except arvados.errors.ArgumentError as detail:
430 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
431 if self.collection_record is not None and "manifest_text" in self.collection_record:
432 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
434 _logger.exception("arv-mount %s: error", self.collection_locator)
435 if self.collection_record is not None and "manifest_text" in self.collection_record:
436 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
441 def __getitem__(self, item):
442 if item == '.arvados#collection':
443 if self.collection_record_file is None:
444 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
445 self.inodes.add_entry(self.collection_record_file)
446 return self.collection_record_file
448 return super(CollectionDirectory, self).__getitem__(item)
450 def __contains__(self, k):
451 if k == '.arvados#collection':
454 return super(CollectionDirectory, self).__contains__(k)
456 def invalidate(self):
457 self.collection_record = None
458 self.collection_record_file = None
459 super(CollectionDirectory, self).invalidate()
462 return (self.collection_locator is not None)
465 # This is an empirically-derived heuristic to estimate the memory used
466 # to store this collection's metadata. Calculating the memory
467 # footprint directly would be more accurate, but also more complicated.
468 return self._manifest_size * 128
471 if self.collection is not None:
473 self.collection.save()
474 self.collection.stop_threads()
477 class MagicDirectory(Directory):
478 """A special directory that logically contains the set of all extant keep locators.
480 When a file is referenced by lookup(), it is tested to see if it is a valid
481 keep locator to a manifest, and if so, loads the manifest contents as a
482 subdirectory of this directory with the locator as the directory name.
483 Since querying a list of all extant keep locators is impractical, only
484 collections that have already been accessed are visible to readdir().
489 This directory provides access to Arvados collections as subdirectories listed
490 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
491 the form '1234567890abcdefghijklmnopqrstuv+123').
493 Note that this directory will appear empty until you attempt to access a
494 specific collection subdirectory (such as trying to 'cd' into it), at which
495 point the collection will actually be looked up on the server and the directory
496 will appear if it exists.
499 def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
500 super(MagicDirectory, self).__init__(parent_inode, inodes)
502 self.num_retries = num_retries
503 self.pdh_only = pdh_only
505 def __setattr__(self, name, value):
506 super(MagicDirectory, self).__setattr__(name, value)
507 # When we're assigned an inode, add a README.
508 if ((name == 'inode') and (self.inode is not None) and
509 (not self._entries)):
510 self._entries['README'] = self.inodes.add_entry(
511 StringFile(self.inode, self.README_TEXT, time.time()))
512 # If we're the root directory, add an identical by_id subdirectory.
513 if self.inode == llfuse.ROOT_INODE:
514 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
515 self.inode, self.inodes, self.api, self.num_retries))
517 def __contains__(self, k):
518 if k in self._entries:
521 if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
525 e = self.inodes.add_entry(CollectionDirectory(
526 self.inode, self.inodes, self.api, self.num_retries, k))
529 if k not in self._entries:
532 self.inodes.del_entry(e)
535 self.inodes.del_entry(e)
537 except Exception as e:
538 _logger.debug('arv-mount exception keep %s', e)
539 self.inodes.del_entry(e)
542 def __getitem__(self, item):
544 return self._entries[item]
546 raise KeyError("No collection with id " + item)
548 def clear(self, force=False):
552 class RecursiveInvalidateDirectory(Directory):
553 def invalidate(self):
555 super(RecursiveInvalidateDirectory, self).invalidate()
556 for a in self._entries:
557 self._entries[a].invalidate()
562 class TagsDirectory(RecursiveInvalidateDirectory):
563 """A special directory that contains as subdirectories all tags visible to the user."""
565 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
566 super(TagsDirectory, self).__init__(parent_inode, inodes)
568 self.num_retries = num_retries
570 self._poll_time = poll_time
574 with llfuse.lock_released:
575 tags = self.api.links().list(
576 filters=[['link_class', '=', 'tag']],
577 select=['name'], distinct=True
578 ).execute(num_retries=self.num_retries)
580 self.merge(tags['items'],
582 lambda a, i: a.tag == i['name'],
583 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
586 class TagDirectory(Directory):
587 """A special directory that contains as subdirectories all collections visible
588 to the user that are tagged with a particular tag.
591 def __init__(self, parent_inode, inodes, api, num_retries, tag,
592 poll=False, poll_time=60):
593 super(TagDirectory, self).__init__(parent_inode, inodes)
595 self.num_retries = num_retries
598 self._poll_time = poll_time
602 with llfuse.lock_released:
603 taggedcollections = self.api.links().list(
604 filters=[['link_class', '=', 'tag'],
605 ['name', '=', self.tag],
606 ['head_uuid', 'is_a', 'arvados#collection']],
608 ).execute(num_retries=self.num_retries)
609 self.merge(taggedcollections['items'],
610 lambda i: i['head_uuid'],
611 lambda a, i: a.collection_locator == i['head_uuid'],
612 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
615 class ProjectDirectory(Directory):
616 """A special directory that contains the contents of a project."""
618 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
619 poll=False, poll_time=60):
620 super(ProjectDirectory, self).__init__(parent_inode, inodes)
622 self.num_retries = num_retries
623 self.project_object = project_object
624 self.project_object_file = None
625 self.project_uuid = project_object['uuid']
627 self._poll_time = poll_time
628 self._updating_lock = threading.Lock()
629 self._current_user = None
631 def createDirectory(self, i):
632 if collection_uuid_pattern.match(i['uuid']):
633 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
634 elif group_uuid_pattern.match(i['uuid']):
635 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
636 elif link_uuid_pattern.match(i['uuid']):
637 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
638 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
641 elif uuid_pattern.match(i['uuid']):
642 return ObjectFile(self.parent_inode, i)
647 return self.project_uuid
651 if self.project_object_file == None:
652 self.project_object_file = ObjectFile(self.inode, self.project_object)
653 self.inodes.add_entry(self.project_object_file)
657 if i['name'] is None or len(i['name']) == 0:
659 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
660 # collection or subproject
662 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
665 elif 'kind' in i and i['kind'].startswith('arvados#'):
667 return "{}.{}".format(i['name'], i['kind'][8:])
672 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
673 return a.uuid() == i['uuid']
674 elif isinstance(a, ObjectFile):
675 return a.uuid() == i['uuid'] and not a.stale()
679 with llfuse.lock_released:
680 self._updating_lock.acquire()
684 if group_uuid_pattern.match(self.project_uuid):
685 self.project_object = self.api.groups().get(
686 uuid=self.project_uuid).execute(num_retries=self.num_retries)
687 elif user_uuid_pattern.match(self.project_uuid):
688 self.project_object = self.api.users().get(
689 uuid=self.project_uuid).execute(num_retries=self.num_retries)
691 contents = arvados.util.list_all(self.api.groups().contents,
692 self.num_retries, uuid=self.project_uuid)
694 # end with llfuse.lock_released, re-acquire lock
699 self.createDirectory)
701 self._updating_lock.release()
705 def __getitem__(self, item):
706 if item == '.arvados#project':
707 return self.project_object_file
709 return super(ProjectDirectory, self).__getitem__(item)
711 def __contains__(self, k):
712 if k == '.arvados#project':
715 return super(ProjectDirectory, self).__contains__(k)
720 with llfuse.lock_released:
721 if not self._current_user:
722 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
723 return self._current_user["uuid"] in self.project_object["writable_by"]
730 def mkdir(self, name):
732 with llfuse.lock_released:
733 self.api.collections().create(body={"owner_uuid": self.project_uuid,
735 "manifest_text": ""}).execute(num_retries=self.num_retries)
737 except apiclient_errors.Error as error:
739 raise llfuse.FUSEError(errno.EEXIST)
743 def rmdir(self, name):
745 raise llfuse.FUSEError(errno.ENOENT)
746 if not isinstance(self[name], CollectionDirectory):
747 raise llfuse.FUSEError(errno.EPERM)
748 if len(self[name]) > 0:
749 raise llfuse.FUSEError(errno.ENOTEMPTY)
750 with llfuse.lock_released:
751 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
756 def rename(self, name_old, name_new, src):
757 if not isinstance(src, ProjectDirectory):
758 raise llfuse.FUSEError(errno.EPERM)
762 if not isinstance(ent, CollectionDirectory):
763 raise llfuse.FUSEError(errno.EPERM)
766 # POSIX semantics for replacing one directory with another is
767 # tricky (the target directory must be empty, the operation must be
768 # atomic which isn't possible with the Arvados API as of this
769 # writing) so don't support that.
770 raise llfuse.FUSEError(errno.EPERM)
772 self.api.collections().update(uuid=ent.uuid(),
773 body={"owner_uuid": self.uuid(),
774 "name": name_new}).execute(num_retries=self.num_retries)
776 # Acually move the entry from source directory to this directory.
777 del src._entries[name_old]
778 self._entries[name_new] = ent
779 self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
782 class SharedDirectory(Directory):
783 """A special directory that represents users or groups who have shared projects with me."""
785 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
786 poll=False, poll_time=60):
787 super(SharedDirectory, self).__init__(parent_inode, inodes)
789 self.num_retries = num_retries
790 self.current_user = api.users().current().execute(num_retries=num_retries)
792 self._poll_time = poll_time
796 with llfuse.lock_released:
797 all_projects = arvados.util.list_all(
798 self.api.groups().list, self.num_retries,
799 filters=[['group_class','=','project']])
801 for ob in all_projects:
802 objects[ob['uuid']] = ob
806 for ob in all_projects:
807 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
809 root_owners[ob['owner_uuid']] = True
811 lusers = arvados.util.list_all(
812 self.api.users().list, self.num_retries,
813 filters=[['uuid','in', list(root_owners)]])
814 lgroups = arvados.util.list_all(
815 self.api.groups().list, self.num_retries,
816 filters=[['uuid','in', list(root_owners)]])
822 objects[l["uuid"]] = l
824 objects[l["uuid"]] = l
827 for r in root_owners:
831 contents[obr["name"]] = obr
832 #elif obr.get("username"):
833 # contents[obr["username"]] = obr
834 elif "first_name" in obr:
835 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
839 if r['owner_uuid'] not in objects:
840 contents[r['name']] = r
842 # end with llfuse.lock_released, re-acquire lock
845 self.merge(contents.items(),
847 lambda a, i: a.uuid() == i[1]['uuid'],
848 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))