2 # FUSE driver for Arvados Keep
8 from llfuse import FUSEError
23 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
25 _logger = logging.getLogger('arvados.arvados_fuse')
27 # Match any character which FUSE or Linux cannot accommodate as part
28 # of a filename. (If present in a collection filename, they will
29 # appear as underscores in the fuse mount.)
30 _disallowed_filename_characters = re.compile('[\x00/]')
32 class SafeApi(object):
33 '''Threadsafe wrapper for API object. This stores and returns a different api
34 object per thread, because httplib2 which underlies apiclient is not
38 def __init__(self, config):
39 self.host = config.get('ARVADOS_API_HOST')
40 self.api_token = config.get('ARVADOS_API_TOKEN')
41 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
42 self.local = threading.local()
43 self.block_cache = arvados.KeepBlockCache()
46 if 'api' not in self.local.__dict__:
47 self.local.api = arvados.api('v1', False, self.host,
48 self.api_token, self.insecure)
52 if 'keep' not in self.local.__dict__:
53 self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
54 return self.local.keep
56 def __getattr__(self, name):
57 # Proxy nonexistent attributes to the local API client.
59 return getattr(self.localapi(), name)
60 except AttributeError:
61 return super(SafeApi, self).__getattr__(name)
65 '''Parse Arvados timestamp to unix time.'''
67 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
68 except (TypeError, ValueError):
71 def sanitize_filename(dirty):
72 '''Replace disallowed filename characters with harmless "_".'''
82 return _disallowed_filename_characters.sub('_', dirty)
85 class FreshBase(object):
86 '''Base class for maintaining fresh/stale state to determine when to update.'''
90 self._last_update = time.time()
91 self._atime = time.time()
94 # Mark the value as stale
98 # Test if the entries dict is stale.
103 return (self._last_update + self._poll_time) < self._atime
108 self._last_update = time.time()
113 class File(FreshBase):
114 '''Base for file objects.'''
116 def __init__(self, parent_inode, _mtime=0):
117 super(File, self).__init__()
119 self.parent_inode = parent_inode
125 def readfrom(self, off, size):
132 class StreamReaderFile(File):
133 '''Wraps a StreamFileReader as a file.'''
135 def __init__(self, parent_inode, reader, _mtime):
136 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
140 return self.reader.size()
142 def readfrom(self, off, size):
143 return self.reader.readfrom(off, size)
149 class StringFile(File):
150 '''Wrap a simple string as a file'''
151 def __init__(self, parent_inode, contents, _mtime):
152 super(StringFile, self).__init__(parent_inode, _mtime)
153 self.contents = contents
156 return len(self.contents)
158 def readfrom(self, off, size):
159 return self.contents[off:(off+size)]
162 class ObjectFile(StringFile):
163 '''Wrap a dict as a serialized json object.'''
165 def __init__(self, parent_inode, obj):
166 super(ObjectFile, self).__init__(parent_inode, "", 0)
167 self.uuid = obj['uuid']
170 def update(self, obj):
171 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
172 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
175 class Directory(FreshBase):
176 '''Generic directory object, backed by a dict.
177 Consists of a set of entries with the key representing the filename
178 and the value referencing a File or Directory object.
181 def __init__(self, parent_inode):
182 super(Directory, self).__init__()
184 '''parent_inode is the integer inode number'''
186 if not isinstance(parent_inode, int):
187 raise Exception("parent_inode should be an int")
188 self.parent_inode = parent_inode
190 self._mtime = time.time()
192 # Overriden by subclasses to implement logic to update the entries dict
193 # when the directory is stale
197 # Only used when computing the size of the disk footprint of the directory
202 def checkupdate(self):
206 except apiclient.errors.HttpError as e:
209 def __getitem__(self, item):
211 return self._entries[item]
215 return self._entries.items()
219 return self._entries.iterkeys()
221 def __contains__(self, k):
223 return k in self._entries
225 def merge(self, items, fn, same, new_entry):
226 '''Helper method for updating the contents of the directory. Takes a list
227 describing the new contents of the directory, reuse entries that are
228 the same in both the old and new lists, create new entries, and delete
229 old entries missing from the new list.
231 items: iterable with new directory contents
233 fn: function to take an entry in 'items' and return the desired file or
234 directory name, or None if this entry should be skipped
236 same: function to compare an existing entry (a File or Directory
237 object) with an entry in the items list to determine whether to keep
240 new_entry: function to create a new directory entry (File or Directory
241 object) from an entry in the items list.
245 oldentries = self._entries
249 name = sanitize_filename(fn(i))
251 if name in oldentries and same(oldentries[name], i):
252 # move existing directory entry over
253 self._entries[name] = oldentries[name]
256 # create new directory entry
259 self._entries[name] = self.inodes.add_entry(ent)
262 # delete any other directory entries that were not in found in 'items'
264 llfuse.invalidate_entry(self.inode, str(i))
265 self.inodes.del_entry(oldentries[i])
269 self._mtime = time.time()
274 '''Delete all entries'''
275 oldentries = self._entries
278 if isinstance(n, Directory):
280 llfuse.invalidate_entry(self.inode, str(n))
281 self.inodes.del_entry(oldentries[n])
288 class CollectionDirectory(Directory):
289 '''Represents the root of a directory tree holding a collection.'''
291 def __init__(self, parent_inode, inodes, api, num_retries, collection):
292 super(CollectionDirectory, self).__init__(parent_inode)
295 self.num_retries = num_retries
296 self.collection_object_file = None
297 self.collection_object = None
298 if isinstance(collection, dict):
299 self.collection_locator = collection['uuid']
301 self.collection_locator = collection
304 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
306 def new_collection(self, new_collection_object):
307 self.collection_object = new_collection_object
309 if self.collection_object_file is not None:
310 self.collection_object_file.update(self.collection_object)
313 collection = arvados.CollectionReader(
314 self.collection_object["manifest_text"], self.api,
315 self.api.localkeep(), num_retries=self.num_retries)
316 collection.normalize()
317 for s in collection.all_streams():
319 for part in s.name().split('/'):
320 if part != '' and part != '.':
321 partname = sanitize_filename(part)
322 if partname not in cwd._entries:
323 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
324 cwd = cwd._entries[partname]
325 for k, v in s.files().items():
326 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
330 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
333 with llfuse.lock_released:
334 new_collection_object = self.api.collections().get(
335 uuid=self.collection_locator
336 ).execute(num_retries=self.num_retries)
337 if "portable_data_hash" not in new_collection_object:
338 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
339 # end with llfuse.lock_released, re-acquire lock
341 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
342 self.new_collection(new_collection_object)
346 except apiclient.errors.HttpError as e:
347 if e.resp.status == 404:
348 _logger.warn("arv-mount %s: not found", self.collection_locator)
350 _logger.error("arv-mount %s: error", self.collection_locator)
351 _logger.exception(detail)
352 except arvados.errors.ArgumentError as detail:
353 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
354 if self.collection_object is not None and "manifest_text" in self.collection_object:
355 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
356 except Exception as detail:
357 _logger.error("arv-mount %s: error", self.collection_locator)
358 if self.collection_object is not None and "manifest_text" in self.collection_object:
359 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
360 _logger.exception(detail)
363 def __getitem__(self, item):
365 if item == '.arvados#collection':
366 if self.collection_object_file is None:
367 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
368 self.inodes.add_entry(self.collection_object_file)
369 return self.collection_object_file
371 return super(CollectionDirectory, self).__getitem__(item)
373 def __contains__(self, k):
374 if k == '.arvados#collection':
377 return super(CollectionDirectory, self).__contains__(k)
381 return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
384 class MagicDirectory(Directory):
385 '''A special directory that logically contains the set of all extant keep
386 locators. When a file is referenced by lookup(), it is tested to see if it
387 is a valid keep locator to a manifest, and if so, loads the manifest
388 contents as a subdirectory of this directory with the locator as the
389 directory name. Since querying a list of all extant keep locators is
390 impractical, only collections that have already been accessed are visible
394 def __init__(self, parent_inode, inodes, api, num_retries):
395 super(MagicDirectory, self).__init__(parent_inode)
398 self.num_retries = num_retries
399 # Have to defer creating readme_file because at this point we don't
400 # yet have an inode assigned.
401 self.readme_file = None
403 def create_readme(self):
404 if self.readme_file is None:
405 text = '''This directory provides access to Arvados collections as subdirectories listed
406 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
407 the form '1234567890abcdefghijklmnopqrstuv+123').
409 Note that this directory will appear empty until you attempt to access a
410 specific collection subdirectory (such as trying to 'cd' into it), at which
411 point the collection will actually be looked up on the server and the directory
412 will appear if it exists.
414 self.readme_file = self.inodes.add_entry(StringFile(self.inode, text, time.time()))
415 self._entries["README"] = self.readme_file
417 def __contains__(self, k):
420 if k in self._entries:
423 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
427 e = self.inodes.add_entry(CollectionDirectory(
428 self.inode, self.inodes, self.api, self.num_retries, k))
434 except Exception as e:
435 _logger.debug('arv-mount exception keep %s', e)
440 return self._entries.items()
442 def __getitem__(self, item):
444 return self._entries[item]
446 raise KeyError("No collection with id " + item)
449 class RecursiveInvalidateDirectory(Directory):
450 def invalidate(self):
451 if self.inode == llfuse.ROOT_INODE:
452 llfuse.lock.acquire()
454 super(RecursiveInvalidateDirectory, self).invalidate()
455 for a in self._entries:
456 self._entries[a].invalidate()
457 except Exception as e:
460 if self.inode == llfuse.ROOT_INODE:
461 llfuse.lock.release()
464 class TagsDirectory(RecursiveInvalidateDirectory):
465 '''A special directory that contains as subdirectories all tags visible to the user.'''
467 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
468 super(TagsDirectory, self).__init__(parent_inode)
471 self.num_retries = num_retries
473 self._poll_time = poll_time
476 with llfuse.lock_released:
477 tags = self.api.links().list(
478 filters=[['link_class', '=', 'tag']],
479 select=['name'], distinct=True
480 ).execute(num_retries=self.num_retries)
482 self.merge(tags['items'],
483 lambda i: i['name'] if 'name' in i else i['uuid'],
484 lambda a, i: a.tag == i,
485 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
488 class TagDirectory(Directory):
489 '''A special directory that contains as subdirectories all collections visible
490 to the user that are tagged with a particular tag.
493 def __init__(self, parent_inode, inodes, api, num_retries, tag,
494 poll=False, poll_time=60):
495 super(TagDirectory, self).__init__(parent_inode)
498 self.num_retries = num_retries
501 self._poll_time = poll_time
504 with llfuse.lock_released:
505 taggedcollections = self.api.links().list(
506 filters=[['link_class', '=', 'tag'],
507 ['name', '=', self.tag],
508 ['head_uuid', 'is_a', 'arvados#collection']],
510 ).execute(num_retries=self.num_retries)
511 self.merge(taggedcollections['items'],
512 lambda i: i['head_uuid'],
513 lambda a, i: a.collection_locator == i['head_uuid'],
514 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
517 class ProjectDirectory(Directory):
518 '''A special directory that contains the contents of a project.'''
520 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
521 poll=False, poll_time=60):
522 super(ProjectDirectory, self).__init__(parent_inode)
525 self.num_retries = num_retries
526 self.project_object = project_object
527 self.project_object_file = None
528 self.uuid = project_object['uuid']
530 def createDirectory(self, i):
531 if collection_uuid_pattern.match(i['uuid']):
532 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
533 elif group_uuid_pattern.match(i['uuid']):
534 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
535 elif link_uuid_pattern.match(i['uuid']):
536 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
537 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
540 elif uuid_pattern.match(i['uuid']):
541 return ObjectFile(self.parent_inode, i)
546 if self.project_object_file == None:
547 self.project_object_file = ObjectFile(self.inode, self.project_object)
548 self.inodes.add_entry(self.project_object_file)
552 if i['name'] is None or len(i['name']) == 0:
554 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
555 # collection or subproject
557 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
560 elif 'kind' in i and i['kind'].startswith('arvados#'):
562 return "{}.{}".format(i['name'], i['kind'][8:])
567 if isinstance(a, CollectionDirectory):
568 return a.collection_locator == i['uuid']
569 elif isinstance(a, ProjectDirectory):
570 return a.uuid == i['uuid']
571 elif isinstance(a, ObjectFile):
572 return a.uuid == i['uuid'] and not a.stale()
575 with llfuse.lock_released:
576 if group_uuid_pattern.match(self.uuid):
577 self.project_object = self.api.groups().get(
578 uuid=self.uuid).execute(num_retries=self.num_retries)
579 elif user_uuid_pattern.match(self.uuid):
580 self.project_object = self.api.users().get(
581 uuid=self.uuid).execute(num_retries=self.num_retries)
583 contents = arvados.util.list_all(self.api.groups().contents,
584 self.num_retries, uuid=self.uuid)
585 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
586 contents += arvados.util.list_all(
587 self.api.links().list, self.num_retries,
588 filters=[['tail_uuid', '=', self.uuid],
589 ['link_class', '=', 'name']])
591 # end with llfuse.lock_released, re-acquire lock
596 self.createDirectory)
598 def __getitem__(self, item):
600 if item == '.arvados#project':
601 return self.project_object_file
603 return super(ProjectDirectory, self).__getitem__(item)
605 def __contains__(self, k):
606 if k == '.arvados#project':
609 return super(ProjectDirectory, self).__contains__(k)
612 class SharedDirectory(Directory):
613 '''A special directory that represents users or groups who have shared projects with me.'''
615 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
616 poll=False, poll_time=60):
617 super(SharedDirectory, self).__init__(parent_inode)
620 self.num_retries = num_retries
621 self.current_user = api.users().current().execute(num_retries=num_retries)
623 self._poll_time = poll_time
626 with llfuse.lock_released:
627 all_projects = arvados.util.list_all(
628 self.api.groups().list, self.num_retries,
629 filters=[['group_class','=','project']])
631 for ob in all_projects:
632 objects[ob['uuid']] = ob
636 for ob in all_projects:
637 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
639 root_owners[ob['owner_uuid']] = True
641 lusers = arvados.util.list_all(
642 self.api.users().list, self.num_retries,
643 filters=[['uuid','in', list(root_owners)]])
644 lgroups = arvados.util.list_all(
645 self.api.groups().list, self.num_retries,
646 filters=[['uuid','in', list(root_owners)]])
652 objects[l["uuid"]] = l
654 objects[l["uuid"]] = l
657 for r in root_owners:
661 contents[obr["name"]] = obr
662 if "first_name" in obr:
663 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
666 if r['owner_uuid'] not in objects:
667 contents[r['name']] = r
669 # end with llfuse.lock_released, re-acquire lock
672 self.merge(contents.items(),
674 lambda a, i: a.uuid == i[1]['uuid'],
675 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
676 except Exception as e:
680 class FileHandle(object):
681 '''Connects a numeric file handle to a File or Directory object that has
682 been opened by the client.'''
684 def __init__(self, fh, entry):
689 class Inodes(object):
690 '''Manage the set of inodes. This is the mapping from a numeric id
691 to a concrete File or Directory object'''
695 self._counter = llfuse.ROOT_INODE
697 def __getitem__(self, item):
698 return self._entries[item]
700 def __setitem__(self, key, item):
701 self._entries[key] = item
704 return self._entries.iterkeys()
707 return self._entries.items()
709 def __contains__(self, k):
710 return k in self._entries
712 def add_entry(self, entry):
713 entry.inode = self._counter
714 self._entries[entry.inode] = entry
718 def del_entry(self, entry):
719 llfuse.invalidate_inode(entry.inode)
720 del self._entries[entry.inode]
722 class Operations(llfuse.Operations):
723 '''This is the main interface with llfuse. The methods on this object are
724 called by llfuse threads to service FUSE events to query and read from
727 llfuse has its own global lock which is acquired before calling a request handler,
728 so request handlers do not run concurrently unless the lock is explicitly released
729 using "with llfuse.lock_released:"'''
731 def __init__(self, uid, gid, encoding="utf-8"):
732 super(Operations, self).__init__()
734 self.inodes = Inodes()
737 self.encoding = encoding
739 # dict of inode to filehandle
740 self._filehandles = {}
741 self._filehandles_counter = 1
743 # Other threads that need to wait until the fuse driver
744 # is fully initialized should wait() on this event object.
745 self.initlock = threading.Event()
748 # Allow threads that are waiting for the driver to be finished
749 # initializing to continue
752 def access(self, inode, mode, ctx):
755 def getattr(self, inode):
756 if inode not in self.inodes:
757 raise llfuse.FUSEError(errno.ENOENT)
759 e = self.inodes[inode]
761 entry = llfuse.EntryAttributes()
764 entry.entry_timeout = 300
765 entry.attr_timeout = 300
767 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
768 if isinstance(e, Directory):
769 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
771 entry.st_mode |= stat.S_IFREG
774 entry.st_uid = self.uid
775 entry.st_gid = self.gid
778 entry.st_size = e.size()
780 entry.st_blksize = 512
781 entry.st_blocks = (e.size()/512)+1
782 entry.st_atime = int(e.atime())
783 entry.st_mtime = int(e.mtime())
784 entry.st_ctime = int(e.mtime())
788 def lookup(self, parent_inode, name):
789 name = unicode(name, self.encoding)
790 _logger.debug("arv-mount lookup: parent_inode %i name %s",
797 if parent_inode in self.inodes:
798 p = self.inodes[parent_inode]
800 inode = p.parent_inode
801 elif isinstance(p, Directory) and name in p:
802 inode = p[name].inode
805 return self.getattr(inode)
807 raise llfuse.FUSEError(errno.ENOENT)
809 def open(self, inode, flags):
810 if inode in self.inodes:
811 p = self.inodes[inode]
813 raise llfuse.FUSEError(errno.ENOENT)
815 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
816 raise llfuse.FUSEError(errno.EROFS)
818 if isinstance(p, Directory):
819 raise llfuse.FUSEError(errno.EISDIR)
821 fh = self._filehandles_counter
822 self._filehandles_counter += 1
823 self._filehandles[fh] = FileHandle(fh, p)
826 def read(self, fh, off, size):
827 _logger.debug("arv-mount read %i %i %i", fh, off, size)
828 if fh in self._filehandles:
829 handle = self._filehandles[fh]
831 raise llfuse.FUSEError(errno.EBADF)
834 handle.entry._atime = time.time()
837 with llfuse.lock_released:
838 return handle.entry.readfrom(off, size)
839 except arvados.errors.NotFoundError as e:
840 _logger.warning("Block not found: " + str(e))
841 raise llfuse.FUSEError(errno.EIO)
842 except Exception as e:
844 raise llfuse.FUSEError(errno.EIO)
846 def release(self, fh):
847 if fh in self._filehandles:
848 del self._filehandles[fh]
850 def opendir(self, inode):
851 _logger.debug("arv-mount opendir: inode %i", inode)
853 if inode in self.inodes:
854 p = self.inodes[inode]
856 raise llfuse.FUSEError(errno.ENOENT)
858 if not isinstance(p, Directory):
859 raise llfuse.FUSEError(errno.ENOTDIR)
861 fh = self._filehandles_counter
862 self._filehandles_counter += 1
863 if p.parent_inode in self.inodes:
864 parent = self.inodes[p.parent_inode]
866 raise llfuse.FUSEError(errno.EIO)
869 p._atime = time.time()
871 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
874 def readdir(self, fh, off):
875 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
877 if fh in self._filehandles:
878 handle = self._filehandles[fh]
880 raise llfuse.FUSEError(errno.EBADF)
882 _logger.debug("arv-mount handle.entry %s", handle.entry)
885 while e < len(handle.entry):
886 if handle.entry[e][1].inode in self.inodes:
888 yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
889 except UnicodeEncodeError:
893 def releasedir(self, fh):
894 del self._filehandles[fh]
897 st = llfuse.StatvfsData()
898 st.f_bsize = 64 * 1024
911 # The llfuse documentation recommends only overloading functions that
912 # are actually implemented, as the default implementation will raise ENOSYS.
913 # However, there is a bug in the llfuse default implementation of create()
914 # "create() takes exactly 5 positional arguments (6 given)" which will crash
916 # The workaround is to implement it with the proper number of parameters,
917 # and then everything works out.
918 def create(self, p1, p2, p3, p4, p5):
919 raise llfuse.FUSEError(errno.EROFS)