9 from apiclient import errors as apiclient_errors
12 from fusefile import StringFile, ObjectFile, FuseArvadosFile
13 from fresh import FreshBase, convertTime, use_counter, check_update
15 import arvados.collection
16 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
18 _logger = logging.getLogger('arvados.arvados_fuse')
21 # Match any character which FUSE or Linux cannot accommodate as part
22 # of a filename. (If present in a collection filename, they will
23 # appear as underscores in the fuse mount.)
24 _disallowed_filename_characters = re.compile('[\x00/]')
26 # '.' and '..' are not reachable if API server is newer than #6277
27 def sanitize_filename(dirty):
28 """Replace disallowed filename characters with harmless "_"."""
38 return _disallowed_filename_characters.sub('_', dirty)
41 class Directory(FreshBase):
42 """Generic directory object, backed by a dict.
44 Consists of a set of entries with the key representing the filename
45 and the value referencing a File or Directory object.
48 def __init__(self, parent_inode, inodes):
49 """parent_inode is the integer inode number"""
51 super(Directory, self).__init__()
54 if not isinstance(parent_inode, int):
55 raise Exception("parent_inode should be an int")
56 self.parent_inode = parent_inode
59 self._mtime = time.time()
61 # Overriden by subclasses to implement logic to update the entries dict
62 # when the directory is stale
67 # Only used when computing the size of the disk footprint of the directory
75 def checkupdate(self):
79 except apiclient.errors.HttpError as e:
84 def __getitem__(self, item):
85 return self._entries[item]
90 return list(self._entries.items())
94 def __contains__(self, k):
95 return k in self._entries
100 return len(self._entries)
103 self.inodes.touch(self)
104 super(Directory, self).fresh()
106 def merge(self, items, fn, same, new_entry):
107 """Helper method for updating the contents of the directory.
109 Takes a list describing the new contents of the directory, reuse
110 entries that are the same in both the old and new lists, create new
111 entries, and delete old entries missing from the new list.
113 :items: iterable with new directory contents
115 :fn: function to take an entry in 'items' and return the desired file or
116 directory name, or None if this entry should be skipped
118 :same: function to compare an existing entry (a File or Directory
119 object) with an entry in the items list to determine whether to keep
122 :new_entry: function to create a new directory entry (File or Directory
123 object) from an entry in the items list.
127 oldentries = self._entries
131 name = sanitize_filename(fn(i))
133 if name in oldentries and same(oldentries[name], i):
134 # move existing directory entry over
135 self._entries[name] = oldentries[name]
138 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
139 # create new directory entry
142 self._entries[name] = self.inodes.add_entry(ent)
145 # delete any other directory entries that were not in found in 'items'
147 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
148 self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
149 self.inodes.del_entry(oldentries[i])
153 self.inodes.invalidate_inode(self.inode)
154 self._mtime = time.time()
158 def clear(self, force=False):
159 """Delete all entries"""
161 if not self.in_use() or force:
162 oldentries = self._entries
165 if not oldentries[n].clear(force):
166 self._entries = oldentries
169 self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
170 self.inodes.del_entry(oldentries[n])
171 self.inodes.invalidate_inode(self.inode)
186 def create(self, name):
187 raise NotImplementedError()
189 def mkdir(self, name):
190 raise NotImplementedError()
192 def unlink(self, name):
193 raise NotImplementedError()
195 def rmdir(self, name):
196 raise NotImplementedError()
198 def rename(self, name_old, name_new, src):
199 raise NotImplementedError()
202 class CollectionDirectoryBase(Directory):
203 """Represent an Arvados Collection as a directory.
205 This class is used for Subcollections, and is also the base class for
206 CollectionDirectory, which implements collection loading/saving on
209 Most operations act only the underlying Arvados `Collection` object. The
210 `Collection` object signals via a notify callback to
211 `CollectionDirectoryBase.on_event` that an item was added, removed or
212 modified. FUSE inodes and directory entries are created, deleted or
213 invalidated in response to these events.
217 def __init__(self, parent_inode, inodes, collection):
218 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
219 self.collection = collection
221 def new_entry(self, name, item, mtime):
222 name = sanitize_filename(name)
223 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
224 if item.fuse_entry.dead is not True:
225 raise Exception("Can only reparent dead inode entry")
226 if item.fuse_entry.inode is None:
227 raise Exception("Reparented entry must still have valid inode")
228 item.fuse_entry.dead = False
229 self._entries[name] = item.fuse_entry
230 elif isinstance(item, arvados.collection.RichCollectionBase):
231 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
232 self._entries[name].populate(mtime)
234 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
235 item.fuse_entry = self._entries[name]
237 def on_event(self, event, collection, name, item):
238 if collection == self.collection:
239 name = sanitize_filename(name)
240 _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
242 if event == arvados.collection.ADD:
243 self.new_entry(name, item, self.mtime())
244 elif event == arvados.collection.DEL:
245 ent = self._entries[name]
246 del self._entries[name]
247 self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
248 self.inodes.del_entry(ent)
249 elif event == arvados.collection.MOD:
250 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
251 self.inodes.invalidate_inode(item.fuse_entry.inode)
252 elif name in self._entries:
253 self.inodes.invalidate_inode(self._entries[name].inode)
255 def populate(self, mtime):
257 self.collection.subscribe(self.on_event)
258 for entry, item in self.collection.items():
259 self.new_entry(entry, item, self.mtime())
262 return self.collection.writable()
266 with llfuse.lock_released:
267 self.collection.root_collection().save()
271 def create(self, name):
272 with llfuse.lock_released:
273 self.collection.open(name, "w").close()
277 def mkdir(self, name):
278 with llfuse.lock_released:
279 self.collection.mkdirs(name)
283 def unlink(self, name):
284 with llfuse.lock_released:
285 self.collection.remove(name)
290 def rmdir(self, name):
291 with llfuse.lock_released:
292 self.collection.remove(name)
297 def rename(self, name_old, name_new, src):
298 if not isinstance(src, CollectionDirectoryBase):
299 raise llfuse.FUSEError(errno.EPERM)
304 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
306 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
308 raise llfuse.FUSEError(errno.ENOTEMPTY)
309 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
310 raise llfuse.FUSEError(errno.ENOTDIR)
311 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
312 raise llfuse.FUSEError(errno.EISDIR)
314 with llfuse.lock_released:
315 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
320 class CollectionDirectory(CollectionDirectoryBase):
321 """Represents the root of a directory tree representing a collection."""
323 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
324 super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
326 self.num_retries = num_retries
327 self.collection_record_file = None
328 self.collection_record = None
329 if isinstance(collection_record, dict):
330 self.collection_locator = collection_record['uuid']
331 self._mtime = convertTime(collection_record.get('modified_at'))
333 self.collection_locator = collection_record
335 self._manifest_size = 0
336 if self.collection_locator:
337 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
338 self._updating_lock = threading.Lock()
341 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
344 return self.collection.writable() if self.collection is not None else self._writable
346 # Used by arv-web.py to switch the contents of the CollectionDirectory
347 def change_collection(self, new_locator):
348 """Switch the contents of the CollectionDirectory.
350 Must be called with llfuse.lock held.
353 self.collection_locator = new_locator
354 self.collection_record = None
357 def new_collection(self, new_collection_record, coll_reader):
359 self.clear(force=True)
361 self.collection_record = new_collection_record
363 if self.collection_record:
364 self._mtime = convertTime(self.collection_record.get('modified_at'))
365 self.collection_locator = self.collection_record["uuid"]
366 if self.collection_record_file is not None:
367 self.collection_record_file.update(self.collection_record)
369 self.collection = coll_reader
370 self.populate(self.mtime())
373 return self.collection_locator
376 def update(self, to_record_version=None):
378 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
381 if self.collection_locator is None:
386 with llfuse.lock_released:
387 self._updating_lock.acquire()
391 _logger.debug("Updating %s", to_record_version)
392 if self.collection is not None:
393 if self.collection.known_past_version(to_record_version):
394 _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
396 self.collection.update()
398 if uuid_pattern.match(self.collection_locator):
399 coll_reader = arvados.collection.Collection(
400 self.collection_locator, self.api, self.api.keep,
401 num_retries=self.num_retries)
403 coll_reader = arvados.collection.CollectionReader(
404 self.collection_locator, self.api, self.api.keep,
405 num_retries=self.num_retries)
406 new_collection_record = coll_reader.api_response() or {}
407 # If the Collection only exists in Keep, there will be no API
408 # response. Fill in the fields we need.
409 if 'uuid' not in new_collection_record:
410 new_collection_record['uuid'] = self.collection_locator
411 if "portable_data_hash" not in new_collection_record:
412 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
413 if 'manifest_text' not in new_collection_record:
414 new_collection_record['manifest_text'] = coll_reader.manifest_text()
416 if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
417 self.new_collection(new_collection_record, coll_reader)
419 self._manifest_size = len(coll_reader.manifest_text())
420 _logger.debug("%s manifest_size %i", self, self._manifest_size)
421 # end with llfuse.lock_released, re-acquire lock
426 self._updating_lock.release()
427 except arvados.errors.NotFoundError as e:
428 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
429 except arvados.errors.ArgumentError as detail:
430 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
431 if self.collection_record is not None and "manifest_text" in self.collection_record:
432 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
434 _logger.exception("arv-mount %s: error", self.collection_locator)
435 if self.collection_record is not None and "manifest_text" in self.collection_record:
436 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
441 def __getitem__(self, item):
442 if item == '.arvados#collection':
443 if self.collection_record_file is None:
444 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
445 self.inodes.add_entry(self.collection_record_file)
446 return self.collection_record_file
448 return super(CollectionDirectory, self).__getitem__(item)
450 def __contains__(self, k):
451 if k == '.arvados#collection':
454 return super(CollectionDirectory, self).__contains__(k)
456 def invalidate(self):
457 self.collection_record = None
458 self.collection_record_file = None
459 super(CollectionDirectory, self).invalidate()
462 return (self.collection_locator is not None)
465 # This is an empirically-derived heuristic to estimate the memory used
466 # to store this collection's metadata. Calculating the memory
467 # footprint directly would be more accurate, but also more complicated.
468 return self._manifest_size * 128
471 if self.collection is not None:
473 self.collection.save()
474 self.collection.stop_threads()
477 class MagicDirectory(Directory):
478 """A special directory that logically contains the set of all extant keep locators.
480 When a file is referenced by lookup(), it is tested to see if it is a valid
481 keep locator to a manifest, and if so, loads the manifest contents as a
482 subdirectory of this directory with the locator as the directory name.
483 Since querying a list of all extant keep locators is impractical, only
484 collections that have already been accessed are visible to readdir().
489 This directory provides access to Arvados collections as subdirectories listed
490 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
491 the form '1234567890abcdefghijklmnopqrstuv+123').
493 Note that this directory will appear empty until you attempt to access a
494 specific collection subdirectory (such as trying to 'cd' into it), at which
495 point the collection will actually be looked up on the server and the directory
496 will appear if it exists.
499 def __init__(self, parent_inode, inodes, api, num_retries):
500 super(MagicDirectory, self).__init__(parent_inode, inodes)
502 self.num_retries = num_retries
504 def __setattr__(self, name, value):
505 super(MagicDirectory, self).__setattr__(name, value)
506 # When we're assigned an inode, add a README.
507 if ((name == 'inode') and (self.inode is not None) and
508 (not self._entries)):
509 self._entries['README'] = self.inodes.add_entry(
510 StringFile(self.inode, self.README_TEXT, time.time()))
511 # If we're the root directory, add an identical by_id subdirectory.
512 if self.inode == llfuse.ROOT_INODE:
513 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
514 self.inode, self.inodes, self.api, self.num_retries))
516 def __contains__(self, k):
517 if k in self._entries:
520 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
524 e = self.inodes.add_entry(CollectionDirectory(
525 self.inode, self.inodes, self.api, self.num_retries, k))
528 if k not in self._entries:
531 self.inodes.del_entry(e)
534 self.inodes.del_entry(e)
536 except Exception as e:
537 _logger.debug('arv-mount exception keep %s', e)
538 self.inodes.del_entry(e)
541 def __getitem__(self, item):
543 return self._entries[item]
545 raise KeyError("No collection with id " + item)
547 def clear(self, force=False):
551 class RecursiveInvalidateDirectory(Directory):
552 def invalidate(self):
554 super(RecursiveInvalidateDirectory, self).invalidate()
555 for a in self._entries:
556 self._entries[a].invalidate()
561 class TagsDirectory(RecursiveInvalidateDirectory):
562 """A special directory that contains as subdirectories all tags visible to the user."""
564 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
565 super(TagsDirectory, self).__init__(parent_inode, inodes)
567 self.num_retries = num_retries
569 self._poll_time = poll_time
573 with llfuse.lock_released:
574 tags = self.api.links().list(
575 filters=[['link_class', '=', 'tag']],
576 select=['name'], distinct=True
577 ).execute(num_retries=self.num_retries)
579 self.merge(tags['items'],
581 lambda a, i: a.tag == i['name'],
582 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
585 class TagDirectory(Directory):
586 """A special directory that contains as subdirectories all collections visible
587 to the user that are tagged with a particular tag.
590 def __init__(self, parent_inode, inodes, api, num_retries, tag,
591 poll=False, poll_time=60):
592 super(TagDirectory, self).__init__(parent_inode, inodes)
594 self.num_retries = num_retries
597 self._poll_time = poll_time
601 with llfuse.lock_released:
602 taggedcollections = self.api.links().list(
603 filters=[['link_class', '=', 'tag'],
604 ['name', '=', self.tag],
605 ['head_uuid', 'is_a', 'arvados#collection']],
607 ).execute(num_retries=self.num_retries)
608 self.merge(taggedcollections['items'],
609 lambda i: i['head_uuid'],
610 lambda a, i: a.collection_locator == i['head_uuid'],
611 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
614 class ProjectDirectory(Directory):
615 """A special directory that contains the contents of a project."""
617 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
618 poll=False, poll_time=60):
619 super(ProjectDirectory, self).__init__(parent_inode, inodes)
621 self.num_retries = num_retries
622 self.project_object = project_object
623 self.project_object_file = None
624 self.project_uuid = project_object['uuid']
626 self._poll_time = poll_time
627 self._updating_lock = threading.Lock()
628 self._current_user = None
630 def createDirectory(self, i):
631 if collection_uuid_pattern.match(i['uuid']):
632 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
633 elif group_uuid_pattern.match(i['uuid']):
634 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
635 elif link_uuid_pattern.match(i['uuid']):
636 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
637 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
640 elif uuid_pattern.match(i['uuid']):
641 return ObjectFile(self.parent_inode, i)
646 return self.project_uuid
650 if self.project_object_file == None:
651 self.project_object_file = ObjectFile(self.inode, self.project_object)
652 self.inodes.add_entry(self.project_object_file)
656 if i['name'] is None or len(i['name']) == 0:
658 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
659 # collection or subproject
661 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
664 elif 'kind' in i and i['kind'].startswith('arvados#'):
666 return "{}.{}".format(i['name'], i['kind'][8:])
671 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
672 return a.uuid() == i['uuid']
673 elif isinstance(a, ObjectFile):
674 return a.uuid() == i['uuid'] and not a.stale()
678 with llfuse.lock_released:
679 self._updating_lock.acquire()
683 if group_uuid_pattern.match(self.project_uuid):
684 self.project_object = self.api.groups().get(
685 uuid=self.project_uuid).execute(num_retries=self.num_retries)
686 elif user_uuid_pattern.match(self.project_uuid):
687 self.project_object = self.api.users().get(
688 uuid=self.project_uuid).execute(num_retries=self.num_retries)
690 contents = arvados.util.list_all(self.api.groups().contents,
691 self.num_retries, uuid=self.project_uuid)
693 # end with llfuse.lock_released, re-acquire lock
698 self.createDirectory)
700 self._updating_lock.release()
704 def __getitem__(self, item):
705 if item == '.arvados#project':
706 return self.project_object_file
708 return super(ProjectDirectory, self).__getitem__(item)
710 def __contains__(self, k):
711 if k == '.arvados#project':
714 return super(ProjectDirectory, self).__contains__(k)
719 with llfuse.lock_released:
720 if not self._current_user:
721 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
722 return self._current_user["uuid"] in self.project_object["writable_by"]
729 def mkdir(self, name):
731 with llfuse.lock_released:
732 self.api.collections().create(body={"owner_uuid": self.project_uuid,
734 "manifest_text": ""}).execute(num_retries=self.num_retries)
736 except apiclient_errors.Error as error:
738 raise llfuse.FUSEError(errno.EEXIST)
742 def rmdir(self, name):
744 raise llfuse.FUSEError(errno.ENOENT)
745 if not isinstance(self[name], CollectionDirectory):
746 raise llfuse.FUSEError(errno.EPERM)
747 if len(self[name]) > 0:
748 raise llfuse.FUSEError(errno.ENOTEMPTY)
749 with llfuse.lock_released:
750 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
755 def rename(self, name_old, name_new, src):
756 if not isinstance(src, ProjectDirectory):
757 raise llfuse.FUSEError(errno.EPERM)
761 if not isinstance(ent, CollectionDirectory):
762 raise llfuse.FUSEError(errno.EPERM)
765 # POSIX semantics for replacing one directory with another is
766 # tricky (the target directory must be empty, the operation must be
767 # atomic which isn't possible with the Arvados API as of this
768 # writing) so don't support that.
769 raise llfuse.FUSEError(errno.EPERM)
771 self.api.collections().update(uuid=ent.uuid(),
772 body={"owner_uuid": self.uuid(),
773 "name": name_new}).execute(num_retries=self.num_retries)
775 # Acually move the entry from source directory to this directory.
776 del src._entries[name_old]
777 self._entries[name_new] = ent
778 self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
781 class SharedDirectory(Directory):
782 """A special directory that represents users or groups who have shared projects with me."""
784 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
785 poll=False, poll_time=60):
786 super(SharedDirectory, self).__init__(parent_inode, inodes)
788 self.num_retries = num_retries
789 self.current_user = api.users().current().execute(num_retries=num_retries)
791 self._poll_time = poll_time
795 with llfuse.lock_released:
796 all_projects = arvados.util.list_all(
797 self.api.groups().list, self.num_retries,
798 filters=[['group_class','=','project']])
800 for ob in all_projects:
801 objects[ob['uuid']] = ob
805 for ob in all_projects:
806 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
808 root_owners[ob['owner_uuid']] = True
810 lusers = arvados.util.list_all(
811 self.api.users().list, self.num_retries,
812 filters=[['uuid','in', list(root_owners)]])
813 lgroups = arvados.util.list_all(
814 self.api.groups().list, self.num_retries,
815 filters=[['uuid','in', list(root_owners)]])
821 objects[l["uuid"]] = l
823 objects[l["uuid"]] = l
826 for r in root_owners:
830 contents[obr["name"]] = obr
831 #elif obr.get("username"):
832 # contents[obr["username"]] = obr
833 elif "first_name" in obr:
834 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
838 if r['owner_uuid'] not in objects:
839 contents[r['name']] = r
841 # end with llfuse.lock_released, re-acquire lock
844 self.merge(contents.items(),
846 lambda a, i: a.uuid() == i[1]['uuid'],
847 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))