2 # FUSE driver for Arvados Keep
25 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
27 _logger = logging.getLogger('arvados.arvados_fuse')
29 # Match any character which FUSE or Linux cannot accommodate as part
30 # of a filename. (If present in a collection filename, they will
31 # appear as underscores in the fuse mount.)
32 _disallowed_filename_characters = re.compile('[\x00/]')
34 class SafeApi(object):
35 """Threadsafe wrapper for API object.
37 This stores and returns a different api object per thread, because
38 httplib2 which underlies apiclient is not threadsafe.
41 def __init__(self, config):
42 self.host = config.get('ARVADOS_API_HOST')
43 self.api_token = config.get('ARVADOS_API_TOKEN')
44 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
45 self.local = threading.local()
46 self.block_cache = arvados.KeepBlockCache()
49 if 'api' not in self.local.__dict__:
50 self.local.api = arvados.api(
52 host=self.host, token=self.api_token, insecure=self.insecure)
56 if 'keep' not in self.local.__dict__:
57 self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
58 return self.local.keep
60 def __getattr__(self, name):
61 # Proxy nonexistent attributes to the local API client.
63 return getattr(self.localapi(), name)
64 except AttributeError:
65 return super(SafeApi, self).__getattr__(name)
69 """Parse Arvados timestamp to unix time."""
73 return calendar.timegm(ciso8601.parse_datetime_unaware(t).timetuple())
74 except (TypeError, ValueError):
77 def sanitize_filename(dirty):
78 '''Replace disallowed filename characters with harmless "_".'''
88 return _disallowed_filename_characters.sub('_', dirty)
91 class FreshBase(object):
92 '''Base class for maintaining fresh/stale state to determine when to update.'''
96 self._last_update = time.time()
97 self._atime = time.time()
100 # Mark the value as stale
101 def invalidate(self):
104 # Test if the entries dict is stale.
109 return (self._last_update + self._poll_time) < self._atime
114 self._last_update = time.time()
119 class File(FreshBase):
120 '''Base for file objects.'''
122 def __init__(self, parent_inode, _mtime=0):
123 super(File, self).__init__()
125 self.parent_inode = parent_inode
131 def readfrom(self, off, size):
138 class StreamReaderFile(File):
139 '''Wraps a StreamFileReader as a file.'''
141 def __init__(self, parent_inode, reader, _mtime):
142 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
146 return self.reader.size()
148 def readfrom(self, off, size):
149 return self.reader.readfrom(off, size)
155 class StringFile(File):
156 '''Wrap a simple string as a file'''
157 def __init__(self, parent_inode, contents, _mtime):
158 super(StringFile, self).__init__(parent_inode, _mtime)
159 self.contents = contents
162 return len(self.contents)
164 def readfrom(self, off, size):
165 return self.contents[off:(off+size)]
168 class ObjectFile(StringFile):
169 '''Wrap a dict as a serialized json object.'''
171 def __init__(self, parent_inode, obj):
172 super(ObjectFile, self).__init__(parent_inode, "", 0)
173 self.uuid = obj['uuid']
176 def update(self, obj):
177 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
178 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
181 class Directory(FreshBase):
182 '''Generic directory object, backed by a dict.
183 Consists of a set of entries with the key representing the filename
184 and the value referencing a File or Directory object.
187 def __init__(self, parent_inode):
188 super(Directory, self).__init__()
190 '''parent_inode is the integer inode number'''
192 if not isinstance(parent_inode, int):
193 raise Exception("parent_inode should be an int")
194 self.parent_inode = parent_inode
196 self._mtime = time.time()
198 # Overriden by subclasses to implement logic to update the entries dict
199 # when the directory is stale
203 # Only used when computing the size of the disk footprint of the directory
208 def checkupdate(self):
212 except apiclient.errors.HttpError as e:
215 def __getitem__(self, item):
217 return self._entries[item]
221 return self._entries.items()
225 return self._entries.iterkeys()
227 def __contains__(self, k):
229 return k in self._entries
231 def merge(self, items, fn, same, new_entry):
232 '''Helper method for updating the contents of the directory. Takes a list
233 describing the new contents of the directory, reuse entries that are
234 the same in both the old and new lists, create new entries, and delete
235 old entries missing from the new list.
237 items: iterable with new directory contents
239 fn: function to take an entry in 'items' and return the desired file or
240 directory name, or None if this entry should be skipped
242 same: function to compare an existing entry (a File or Directory
243 object) with an entry in the items list to determine whether to keep
246 new_entry: function to create a new directory entry (File or Directory
247 object) from an entry in the items list.
251 oldentries = self._entries
255 name = sanitize_filename(fn(i))
257 if name in oldentries and same(oldentries[name], i):
258 # move existing directory entry over
259 self._entries[name] = oldentries[name]
262 # create new directory entry
265 self._entries[name] = self.inodes.add_entry(ent)
268 # delete any other directory entries that were not in found in 'items'
270 llfuse.invalidate_entry(self.inode, str(i))
271 self.inodes.del_entry(oldentries[i])
275 self._mtime = time.time()
280 '''Delete all entries'''
281 oldentries = self._entries
284 if isinstance(n, Directory):
286 llfuse.invalidate_entry(self.inode, str(n))
287 self.inodes.del_entry(oldentries[n])
288 llfuse.invalidate_inode(self.inode)
295 class CollectionDirectory(Directory):
296 '''Represents the root of a directory tree holding a collection.'''
298 def __init__(self, parent_inode, inodes, api, num_retries, collection):
299 super(CollectionDirectory, self).__init__(parent_inode)
302 self.num_retries = num_retries
303 self.collection_object_file = None
304 self.collection_object = None
305 if isinstance(collection, dict):
306 self.collection_locator = collection['uuid']
307 self._mtime = convertTime(collection.get('modified_at'))
309 self.collection_locator = collection
313 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
315 # Used by arv-web.py to switch the contents of the CollectionDirectory
316 def change_collection(self, new_locator):
317 """Switch the contents of the CollectionDirectory. Must be called with llfuse.lock held."""
318 self.collection_locator = new_locator
319 self.collection_object = None
322 def new_collection(self, new_collection_object, coll_reader):
323 self.collection_object = new_collection_object
325 self._mtime = convertTime(self.collection_object.get('modified_at'))
327 if self.collection_object_file is not None:
328 self.collection_object_file.update(self.collection_object)
331 for s in coll_reader.all_streams():
333 for part in s.name().split('/'):
334 if part != '' and part != '.':
335 partname = sanitize_filename(part)
336 if partname not in cwd._entries:
337 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
338 cwd = cwd._entries[partname]
339 for k, v in s.files().items():
340 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
344 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
347 if self.collection_locator is None:
351 with llfuse.lock_released:
352 coll_reader = arvados.CollectionReader(
353 self.collection_locator, self.api, self.api.localkeep(),
354 num_retries=self.num_retries)
355 new_collection_object = coll_reader.api_response() or {}
356 # If the Collection only exists in Keep, there will be no API
357 # response. Fill in the fields we need.
358 if 'uuid' not in new_collection_object:
359 new_collection_object['uuid'] = self.collection_locator
360 if "portable_data_hash" not in new_collection_object:
361 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
362 if 'manifest_text' not in new_collection_object:
363 new_collection_object['manifest_text'] = coll_reader.manifest_text()
364 coll_reader.normalize()
365 # end with llfuse.lock_released, re-acquire lock
367 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
368 self.new_collection(new_collection_object, coll_reader)
372 except arvados.errors.NotFoundError:
373 _logger.exception("arv-mount %s: error", self.collection_locator)
374 except arvados.errors.ArgumentError as detail:
375 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
376 if self.collection_object is not None and "manifest_text" in self.collection_object:
377 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
379 _logger.exception("arv-mount %s: error", self.collection_locator)
380 if self.collection_object is not None and "manifest_text" in self.collection_object:
381 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
384 def __getitem__(self, item):
386 if item == '.arvados#collection':
387 if self.collection_object_file is None:
388 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
389 self.inodes.add_entry(self.collection_object_file)
390 return self.collection_object_file
392 return super(CollectionDirectory, self).__getitem__(item)
394 def __contains__(self, k):
395 if k == '.arvados#collection':
398 return super(CollectionDirectory, self).__contains__(k)
401 class MagicDirectory(Directory):
402 '''A special directory that logically contains the set of all extant keep
403 locators. When a file is referenced by lookup(), it is tested to see if it
404 is a valid keep locator to a manifest, and if so, loads the manifest
405 contents as a subdirectory of this directory with the locator as the
406 directory name. Since querying a list of all extant keep locators is
407 impractical, only collections that have already been accessed are visible
412 This directory provides access to Arvados collections as subdirectories listed
413 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
414 the form '1234567890abcdefghijklmnopqrstuv+123').
416 Note that this directory will appear empty until you attempt to access a
417 specific collection subdirectory (such as trying to 'cd' into it), at which
418 point the collection will actually be looked up on the server and the directory
419 will appear if it exists.
422 def __init__(self, parent_inode, inodes, api, num_retries):
423 super(MagicDirectory, self).__init__(parent_inode)
426 self.num_retries = num_retries
428 def __setattr__(self, name, value):
429 super(MagicDirectory, self).__setattr__(name, value)
430 # When we're assigned an inode, add a README.
431 if ((name == 'inode') and (self.inode is not None) and
432 (not self._entries)):
433 self._entries['README'] = self.inodes.add_entry(
434 StringFile(self.inode, self.README_TEXT, time.time()))
435 # If we're the root directory, add an identical by_id subdirectory.
436 if self.inode == llfuse.ROOT_INODE:
437 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
438 self.inode, self.inodes, self.api, self.num_retries))
440 def __contains__(self, k):
441 if k in self._entries:
444 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
448 e = self.inodes.add_entry(CollectionDirectory(
449 self.inode, self.inodes, self.api, self.num_retries, k))
455 except Exception as e:
456 _logger.debug('arv-mount exception keep %s', e)
459 def __getitem__(self, item):
461 return self._entries[item]
463 raise KeyError("No collection with id " + item)
466 class RecursiveInvalidateDirectory(Directory):
467 def invalidate(self):
468 if self.inode == llfuse.ROOT_INODE:
469 llfuse.lock.acquire()
471 super(RecursiveInvalidateDirectory, self).invalidate()
472 for a in self._entries:
473 self._entries[a].invalidate()
477 if self.inode == llfuse.ROOT_INODE:
478 llfuse.lock.release()
481 class TagsDirectory(RecursiveInvalidateDirectory):
482 '''A special directory that contains as subdirectories all tags visible to the user.'''
484 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
485 super(TagsDirectory, self).__init__(parent_inode)
488 self.num_retries = num_retries
490 self._poll_time = poll_time
493 with llfuse.lock_released:
494 tags = self.api.links().list(
495 filters=[['link_class', '=', 'tag']],
496 select=['name'], distinct=True
497 ).execute(num_retries=self.num_retries)
499 self.merge(tags['items'],
501 lambda a, i: a.tag == i['name'],
502 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
505 class TagDirectory(Directory):
506 '''A special directory that contains as subdirectories all collections visible
507 to the user that are tagged with a particular tag.
510 def __init__(self, parent_inode, inodes, api, num_retries, tag,
511 poll=False, poll_time=60):
512 super(TagDirectory, self).__init__(parent_inode)
515 self.num_retries = num_retries
518 self._poll_time = poll_time
521 with llfuse.lock_released:
522 taggedcollections = self.api.links().list(
523 filters=[['link_class', '=', 'tag'],
524 ['name', '=', self.tag],
525 ['head_uuid', 'is_a', 'arvados#collection']],
527 ).execute(num_retries=self.num_retries)
528 self.merge(taggedcollections['items'],
529 lambda i: i['head_uuid'],
530 lambda a, i: a.collection_locator == i['head_uuid'],
531 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
534 class ProjectDirectory(Directory):
535 '''A special directory that contains the contents of a project.'''
537 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
538 poll=False, poll_time=60):
539 super(ProjectDirectory, self).__init__(parent_inode)
542 self.num_retries = num_retries
543 self.project_object = project_object
544 self.project_object_file = None
545 self.uuid = project_object['uuid']
547 self._poll_time = poll_time
549 def createDirectory(self, i):
550 if collection_uuid_pattern.match(i['uuid']):
551 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
552 elif group_uuid_pattern.match(i['uuid']):
553 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
554 elif link_uuid_pattern.match(i['uuid']):
555 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
556 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
559 elif uuid_pattern.match(i['uuid']):
560 return ObjectFile(self.parent_inode, i)
565 if self.project_object_file == None:
566 self.project_object_file = ObjectFile(self.inode, self.project_object)
567 self.inodes.add_entry(self.project_object_file)
571 if i['name'] is None or len(i['name']) == 0:
573 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
574 # collection or subproject
576 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
579 elif 'kind' in i and i['kind'].startswith('arvados#'):
581 return "{}.{}".format(i['name'], i['kind'][8:])
586 if isinstance(a, CollectionDirectory):
587 return a.collection_locator == i['uuid']
588 elif isinstance(a, ProjectDirectory):
589 return a.uuid == i['uuid']
590 elif isinstance(a, ObjectFile):
591 return a.uuid == i['uuid'] and not a.stale()
594 with llfuse.lock_released:
595 if group_uuid_pattern.match(self.uuid):
596 self.project_object = self.api.groups().get(
597 uuid=self.uuid).execute(num_retries=self.num_retries)
598 elif user_uuid_pattern.match(self.uuid):
599 self.project_object = self.api.users().get(
600 uuid=self.uuid).execute(num_retries=self.num_retries)
602 contents = arvados.util.list_all(self.api.groups().contents,
603 self.num_retries, uuid=self.uuid)
604 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
605 contents += arvados.util.list_all(
606 self.api.links().list, self.num_retries,
607 filters=[['tail_uuid', '=', self.uuid],
608 ['link_class', '=', 'name']])
610 # end with llfuse.lock_released, re-acquire lock
615 self.createDirectory)
617 def __getitem__(self, item):
619 if item == '.arvados#project':
620 return self.project_object_file
622 return super(ProjectDirectory, self).__getitem__(item)
624 def __contains__(self, k):
625 if k == '.arvados#project':
628 return super(ProjectDirectory, self).__contains__(k)
631 class SharedDirectory(Directory):
632 '''A special directory that represents users or groups who have shared projects with me.'''
634 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
635 poll=False, poll_time=60):
636 super(SharedDirectory, self).__init__(parent_inode)
639 self.num_retries = num_retries
640 self.current_user = api.users().current().execute(num_retries=num_retries)
642 self._poll_time = poll_time
645 with llfuse.lock_released:
646 all_projects = arvados.util.list_all(
647 self.api.groups().list, self.num_retries,
648 filters=[['group_class','=','project']])
650 for ob in all_projects:
651 objects[ob['uuid']] = ob
655 for ob in all_projects:
656 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
658 root_owners[ob['owner_uuid']] = True
660 lusers = arvados.util.list_all(
661 self.api.users().list, self.num_retries,
662 filters=[['uuid','in', list(root_owners)]])
663 lgroups = arvados.util.list_all(
664 self.api.groups().list, self.num_retries,
665 filters=[['uuid','in', list(root_owners)]])
671 objects[l["uuid"]] = l
673 objects[l["uuid"]] = l
676 for r in root_owners:
680 contents[obr["name"]] = obr
681 if "first_name" in obr:
682 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
685 if r['owner_uuid'] not in objects:
686 contents[r['name']] = r
688 # end with llfuse.lock_released, re-acquire lock
691 self.merge(contents.items(),
693 lambda a, i: a.uuid == i[1]['uuid'],
694 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
699 class FileHandle(object):
700 '''Connects a numeric file handle to a File or Directory object that has
701 been opened by the client.'''
703 def __init__(self, fh, entry):
708 class Inodes(object):
709 '''Manage the set of inodes. This is the mapping from a numeric id
710 to a concrete File or Directory object'''
714 self._counter = itertools.count(llfuse.ROOT_INODE)
716 def __getitem__(self, item):
717 return self._entries[item]
719 def __setitem__(self, key, item):
720 self._entries[key] = item
723 return self._entries.iterkeys()
726 return self._entries.items()
728 def __contains__(self, k):
729 return k in self._entries
731 def add_entry(self, entry):
732 entry.inode = next(self._counter)
733 self._entries[entry.inode] = entry
736 def del_entry(self, entry):
737 llfuse.invalidate_inode(entry.inode)
738 del self._entries[entry.inode]
740 class Operations(llfuse.Operations):
741 '''This is the main interface with llfuse. The methods on this object are
742 called by llfuse threads to service FUSE events to query and read from
745 llfuse has its own global lock which is acquired before calling a request handler,
746 so request handlers do not run concurrently unless the lock is explicitly released
747 using "with llfuse.lock_released:"'''
749 def __init__(self, uid, gid, encoding="utf-8"):
750 super(Operations, self).__init__()
752 self.inodes = Inodes()
755 self.encoding = encoding
757 # dict of inode to filehandle
758 self._filehandles = {}
759 self._filehandles_counter = 1
761 # Other threads that need to wait until the fuse driver
762 # is fully initialized should wait() on this event object.
763 self.initlock = threading.Event()
766 # Allow threads that are waiting for the driver to be finished
767 # initializing to continue
770 def access(self, inode, mode, ctx):
773 def getattr(self, inode):
774 if inode not in self.inodes:
775 raise llfuse.FUSEError(errno.ENOENT)
777 e = self.inodes[inode]
779 entry = llfuse.EntryAttributes()
782 entry.entry_timeout = 300
783 entry.attr_timeout = 300
785 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
786 if isinstance(e, Directory):
787 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
788 elif isinstance(e, StreamReaderFile):
789 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
791 entry.st_mode |= stat.S_IFREG
794 entry.st_uid = self.uid
795 entry.st_gid = self.gid
798 entry.st_size = e.size()
800 entry.st_blksize = 512
801 entry.st_blocks = (e.size()/512)+1
802 entry.st_atime = int(e.atime())
803 entry.st_mtime = int(e.mtime())
804 entry.st_ctime = int(e.mtime())
808 def lookup(self, parent_inode, name):
809 name = unicode(name, self.encoding)
810 _logger.debug("arv-mount lookup: parent_inode %i name %s",
817 if parent_inode in self.inodes:
818 p = self.inodes[parent_inode]
820 inode = p.parent_inode
821 elif isinstance(p, Directory) and name in p:
822 inode = p[name].inode
825 return self.getattr(inode)
827 raise llfuse.FUSEError(errno.ENOENT)
829 def open(self, inode, flags):
830 if inode in self.inodes:
831 p = self.inodes[inode]
833 raise llfuse.FUSEError(errno.ENOENT)
835 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
836 raise llfuse.FUSEError(errno.EROFS)
838 if isinstance(p, Directory):
839 raise llfuse.FUSEError(errno.EISDIR)
841 fh = self._filehandles_counter
842 self._filehandles_counter += 1
843 self._filehandles[fh] = FileHandle(fh, p)
846 def read(self, fh, off, size):
847 _logger.debug("arv-mount read %i %i %i", fh, off, size)
848 if fh in self._filehandles:
849 handle = self._filehandles[fh]
851 raise llfuse.FUSEError(errno.EBADF)
854 handle.entry._atime = time.time()
857 with llfuse.lock_released:
858 return handle.entry.readfrom(off, size)
859 except arvados.errors.NotFoundError as e:
860 _logger.warning("Block not found: " + str(e))
861 raise llfuse.FUSEError(errno.EIO)
864 raise llfuse.FUSEError(errno.EIO)
866 def release(self, fh):
867 if fh in self._filehandles:
868 del self._filehandles[fh]
870 def opendir(self, inode):
871 _logger.debug("arv-mount opendir: inode %i", inode)
873 if inode in self.inodes:
874 p = self.inodes[inode]
876 raise llfuse.FUSEError(errno.ENOENT)
878 if not isinstance(p, Directory):
879 raise llfuse.FUSEError(errno.ENOTDIR)
881 fh = self._filehandles_counter
882 self._filehandles_counter += 1
883 if p.parent_inode in self.inodes:
884 parent = self.inodes[p.parent_inode]
886 raise llfuse.FUSEError(errno.EIO)
889 p._atime = time.time()
891 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
894 def readdir(self, fh, off):
895 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
897 if fh in self._filehandles:
898 handle = self._filehandles[fh]
900 raise llfuse.FUSEError(errno.EBADF)
902 _logger.debug("arv-mount handle.entry %s", handle.entry)
905 while e < len(handle.entry):
906 if handle.entry[e][1].inode in self.inodes:
908 yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
909 except UnicodeEncodeError:
913 def releasedir(self, fh):
914 del self._filehandles[fh]
917 st = llfuse.StatvfsData()
918 st.f_bsize = 64 * 1024
931 # The llfuse documentation recommends only overloading functions that
932 # are actually implemented, as the default implementation will raise ENOSYS.
933 # However, there is a bug in the llfuse default implementation of create()
934 # "create() takes exactly 5 positional arguments (6 given)" which will crash
936 # The workaround is to implement it with the proper number of parameters,
937 # and then everything works out.
938 def create(self, p1, p2, p3, p4, p5):
939 raise llfuse.FUSEError(errno.EROFS)