2 # FUSE driver for Arvados Keep
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
26 _logger = logging.getLogger('arvados.arvados_fuse')
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
33 class SafeApi(object):
34 """Threadsafe wrapper for API object.
36 This stores and returns a different api object per thread, because
37 httplib2 which underlies apiclient is not threadsafe.
40 def __init__(self, config):
41 self.host = config.get('ARVADOS_API_HOST')
42 self.api_token = config.get('ARVADOS_API_TOKEN')
43 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
44 self.local = threading.local()
45 self.block_cache = arvados.KeepBlockCache()
48 if 'api' not in self.local.__dict__:
49 self.local.api = arvados.api(
51 host=self.host, token=self.api_token, insecure=self.insecure)
55 if 'keep' not in self.local.__dict__:
56 self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
57 return self.local.keep
59 def __getattr__(self, name):
60 # Proxy nonexistent attributes to the local API client.
62 return getattr(self.localapi(), name)
63 except AttributeError:
64 return super(SafeApi, self).__getattr__(name)
68 '''Parse Arvados timestamp to unix time.'''
70 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
71 except (TypeError, ValueError):
74 def sanitize_filename(dirty):
75 '''Replace disallowed filename characters with harmless "_".'''
85 return _disallowed_filename_characters.sub('_', dirty)
88 class FreshBase(object):
89 '''Base class for maintaining fresh/stale state to determine when to update.'''
93 self._last_update = time.time()
94 self._atime = time.time()
97 # Mark the value as stale
101 # Test if the entries dict is stale.
106 return (self._last_update + self._poll_time) < self._atime
111 self._last_update = time.time()
116 class File(FreshBase):
117 '''Base for file objects.'''
119 def __init__(self, parent_inode, _mtime=0):
120 super(File, self).__init__()
122 self.parent_inode = parent_inode
128 def readfrom(self, off, size):
135 class StreamReaderFile(File):
136 '''Wraps a StreamFileReader as a file.'''
138 def __init__(self, parent_inode, reader, _mtime):
139 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
143 return self.reader.size()
145 def readfrom(self, off, size):
146 return self.reader.readfrom(off, size)
152 class StringFile(File):
153 '''Wrap a simple string as a file'''
154 def __init__(self, parent_inode, contents, _mtime):
155 super(StringFile, self).__init__(parent_inode, _mtime)
156 self.contents = contents
159 return len(self.contents)
161 def readfrom(self, off, size):
162 return self.contents[off:(off+size)]
165 class ObjectFile(StringFile):
166 '''Wrap a dict as a serialized json object.'''
168 def __init__(self, parent_inode, obj):
169 super(ObjectFile, self).__init__(parent_inode, "", 0)
170 self.uuid = obj['uuid']
173 def update(self, obj):
174 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
175 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
178 class Directory(FreshBase):
179 '''Generic directory object, backed by a dict.
180 Consists of a set of entries with the key representing the filename
181 and the value referencing a File or Directory object.
184 def __init__(self, parent_inode):
185 super(Directory, self).__init__()
187 '''parent_inode is the integer inode number'''
189 if not isinstance(parent_inode, int):
190 raise Exception("parent_inode should be an int")
191 self.parent_inode = parent_inode
193 self._mtime = time.time()
195 # Overriden by subclasses to implement logic to update the entries dict
196 # when the directory is stale
200 # Only used when computing the size of the disk footprint of the directory
205 def checkupdate(self):
209 except apiclient.errors.HttpError as e:
212 def __getitem__(self, item):
214 return self._entries[item]
218 return self._entries.items()
222 return self._entries.iterkeys()
224 def __contains__(self, k):
226 return k in self._entries
228 def merge(self, items, fn, same, new_entry):
229 '''Helper method for updating the contents of the directory. Takes a list
230 describing the new contents of the directory, reuse entries that are
231 the same in both the old and new lists, create new entries, and delete
232 old entries missing from the new list.
234 items: iterable with new directory contents
236 fn: function to take an entry in 'items' and return the desired file or
237 directory name, or None if this entry should be skipped
239 same: function to compare an existing entry (a File or Directory
240 object) with an entry in the items list to determine whether to keep
243 new_entry: function to create a new directory entry (File or Directory
244 object) from an entry in the items list.
248 oldentries = self._entries
252 name = sanitize_filename(fn(i))
254 if name in oldentries and same(oldentries[name], i):
255 # move existing directory entry over
256 self._entries[name] = oldentries[name]
259 # create new directory entry
262 self._entries[name] = self.inodes.add_entry(ent)
265 # delete any other directory entries that were not in found in 'items'
267 llfuse.invalidate_entry(self.inode, str(i))
268 self.inodes.del_entry(oldentries[i])
272 self._mtime = time.time()
277 '''Delete all entries'''
278 oldentries = self._entries
281 if isinstance(n, Directory):
283 llfuse.invalidate_entry(self.inode, str(n))
284 self.inodes.del_entry(oldentries[n])
285 llfuse.invalidate_inode(self.inode)
292 class CollectionDirectory(Directory):
293 '''Represents the root of a directory tree holding a collection.'''
295 def __init__(self, parent_inode, inodes, api, num_retries, collection):
296 super(CollectionDirectory, self).__init__(parent_inode)
299 self.num_retries = num_retries
300 self.collection_object_file = None
301 self.collection_object = None
302 if isinstance(collection, dict):
303 self.collection_locator = collection['uuid']
305 self.collection_locator = collection
308 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
310 # Used by arv-web.py to switch the contents of the CollectionDirectory
311 def change_collection(self, new_locator):
312 """Switch the contents of the CollectionDirectory. Must be called with llfuse.lock held."""
313 self.collection_locator = new_locator
314 self.collection_object = None
317 def new_collection(self, new_collection_object, coll_reader):
318 self.collection_object = new_collection_object
320 if self.collection_object_file is not None:
321 self.collection_object_file.update(self.collection_object)
324 for s in coll_reader.all_streams():
326 for part in s.name().split('/'):
327 if part != '' and part != '.':
328 partname = sanitize_filename(part)
329 if partname not in cwd._entries:
330 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
331 cwd = cwd._entries[partname]
332 for k, v in s.files().items():
333 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
337 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
340 if self.collection_locator is None:
344 with llfuse.lock_released:
345 coll_reader = arvados.CollectionReader(
346 self.collection_locator, self.api, self.api.localkeep(),
347 num_retries=self.num_retries)
348 new_collection_object = coll_reader.api_response() or {}
349 # If the Collection only exists in Keep, there will be no API
350 # response. Fill in the fields we need.
351 if 'uuid' not in new_collection_object:
352 new_collection_object['uuid'] = self.collection_locator
353 if "portable_data_hash" not in new_collection_object:
354 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
355 if 'manifest_text' not in new_collection_object:
356 new_collection_object['manifest_text'] = coll_reader.manifest_text()
357 coll_reader.normalize()
358 # end with llfuse.lock_released, re-acquire lock
360 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
361 self.new_collection(new_collection_object, coll_reader)
365 except arvados.errors.NotFoundError:
366 _logger.exception("arv-mount %s: error", self.collection_locator)
367 except arvados.errors.ArgumentError as detail:
368 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
369 if self.collection_object is not None and "manifest_text" in self.collection_object:
370 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
372 _logger.exception("arv-mount %s: error", self.collection_locator)
373 if self.collection_object is not None and "manifest_text" in self.collection_object:
374 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
377 def __getitem__(self, item):
379 if item == '.arvados#collection':
380 if self.collection_object_file is None:
381 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
382 self.inodes.add_entry(self.collection_object_file)
383 return self.collection_object_file
385 return super(CollectionDirectory, self).__getitem__(item)
387 def __contains__(self, k):
388 if k == '.arvados#collection':
391 return super(CollectionDirectory, self).__contains__(k)
395 return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
398 class MagicDirectory(Directory):
399 '''A special directory that logically contains the set of all extant keep
400 locators. When a file is referenced by lookup(), it is tested to see if it
401 is a valid keep locator to a manifest, and if so, loads the manifest
402 contents as a subdirectory of this directory with the locator as the
403 directory name. Since querying a list of all extant keep locators is
404 impractical, only collections that have already been accessed are visible
409 This directory provides access to Arvados collections as subdirectories listed
410 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
411 the form '1234567890abcdefghijklmnopqrstuv+123').
413 Note that this directory will appear empty until you attempt to access a
414 specific collection subdirectory (such as trying to 'cd' into it), at which
415 point the collection will actually be looked up on the server and the directory
416 will appear if it exists.
419 def __init__(self, parent_inode, inodes, api, num_retries):
420 super(MagicDirectory, self).__init__(parent_inode)
423 self.num_retries = num_retries
425 def __setattr__(self, name, value):
426 super(MagicDirectory, self).__setattr__(name, value)
427 # When we're assigned an inode, add a README.
428 if ((name == 'inode') and (self.inode is not None) and
429 (not self._entries)):
430 self._entries['README'] = self.inodes.add_entry(
431 StringFile(self.inode, self.README_TEXT, time.time()))
432 # If we're the root directory, add an identical by_id subdirectory.
433 if self.inode == llfuse.ROOT_INODE:
434 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
435 self.inode, self.inodes, self.api, self.num_retries))
437 def __contains__(self, k):
438 if k in self._entries:
441 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
445 e = self.inodes.add_entry(CollectionDirectory(
446 self.inode, self.inodes, self.api, self.num_retries, k))
452 except Exception as e:
453 _logger.debug('arv-mount exception keep %s', e)
456 def __getitem__(self, item):
458 return self._entries[item]
460 raise KeyError("No collection with id " + item)
463 class RecursiveInvalidateDirectory(Directory):
464 def invalidate(self):
465 if self.inode == llfuse.ROOT_INODE:
466 llfuse.lock.acquire()
468 super(RecursiveInvalidateDirectory, self).invalidate()
469 for a in self._entries:
470 self._entries[a].invalidate()
474 if self.inode == llfuse.ROOT_INODE:
475 llfuse.lock.release()
478 class TagsDirectory(RecursiveInvalidateDirectory):
479 '''A special directory that contains as subdirectories all tags visible to the user.'''
481 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
482 super(TagsDirectory, self).__init__(parent_inode)
485 self.num_retries = num_retries
487 self._poll_time = poll_time
490 with llfuse.lock_released:
491 tags = self.api.links().list(
492 filters=[['link_class', '=', 'tag']],
493 select=['name'], distinct=True
494 ).execute(num_retries=self.num_retries)
496 self.merge(tags['items'],
498 lambda a, i: a.tag == i['name'],
499 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
502 class TagDirectory(Directory):
503 '''A special directory that contains as subdirectories all collections visible
504 to the user that are tagged with a particular tag.
507 def __init__(self, parent_inode, inodes, api, num_retries, tag,
508 poll=False, poll_time=60):
509 super(TagDirectory, self).__init__(parent_inode)
512 self.num_retries = num_retries
515 self._poll_time = poll_time
518 with llfuse.lock_released:
519 taggedcollections = self.api.links().list(
520 filters=[['link_class', '=', 'tag'],
521 ['name', '=', self.tag],
522 ['head_uuid', 'is_a', 'arvados#collection']],
524 ).execute(num_retries=self.num_retries)
525 self.merge(taggedcollections['items'],
526 lambda i: i['head_uuid'],
527 lambda a, i: a.collection_locator == i['head_uuid'],
528 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
531 class ProjectDirectory(Directory):
532 '''A special directory that contains the contents of a project.'''
534 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
535 poll=False, poll_time=60):
536 super(ProjectDirectory, self).__init__(parent_inode)
539 self.num_retries = num_retries
540 self.project_object = project_object
541 self.project_object_file = None
542 self.uuid = project_object['uuid']
544 self._poll_time = poll_time
546 def createDirectory(self, i):
547 if collection_uuid_pattern.match(i['uuid']):
548 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
549 elif group_uuid_pattern.match(i['uuid']):
550 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
551 elif link_uuid_pattern.match(i['uuid']):
552 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
553 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
556 elif uuid_pattern.match(i['uuid']):
557 return ObjectFile(self.parent_inode, i)
562 if self.project_object_file == None:
563 self.project_object_file = ObjectFile(self.inode, self.project_object)
564 self.inodes.add_entry(self.project_object_file)
568 if i['name'] is None or len(i['name']) == 0:
570 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
571 # collection or subproject
573 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
576 elif 'kind' in i and i['kind'].startswith('arvados#'):
578 return "{}.{}".format(i['name'], i['kind'][8:])
583 if isinstance(a, CollectionDirectory):
584 return a.collection_locator == i['uuid']
585 elif isinstance(a, ProjectDirectory):
586 return a.uuid == i['uuid']
587 elif isinstance(a, ObjectFile):
588 return a.uuid == i['uuid'] and not a.stale()
591 with llfuse.lock_released:
592 if group_uuid_pattern.match(self.uuid):
593 self.project_object = self.api.groups().get(
594 uuid=self.uuid).execute(num_retries=self.num_retries)
595 elif user_uuid_pattern.match(self.uuid):
596 self.project_object = self.api.users().get(
597 uuid=self.uuid).execute(num_retries=self.num_retries)
599 contents = arvados.util.list_all(self.api.groups().contents,
600 self.num_retries, uuid=self.uuid)
601 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
602 contents += arvados.util.list_all(
603 self.api.links().list, self.num_retries,
604 filters=[['tail_uuid', '=', self.uuid],
605 ['link_class', '=', 'name']])
607 # end with llfuse.lock_released, re-acquire lock
612 self.createDirectory)
614 def __getitem__(self, item):
616 if item == '.arvados#project':
617 return self.project_object_file
619 return super(ProjectDirectory, self).__getitem__(item)
621 def __contains__(self, k):
622 if k == '.arvados#project':
625 return super(ProjectDirectory, self).__contains__(k)
628 class SharedDirectory(Directory):
629 '''A special directory that represents users or groups who have shared projects with me.'''
631 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
632 poll=False, poll_time=60):
633 super(SharedDirectory, self).__init__(parent_inode)
636 self.num_retries = num_retries
637 self.current_user = api.users().current().execute(num_retries=num_retries)
639 self._poll_time = poll_time
642 with llfuse.lock_released:
643 all_projects = arvados.util.list_all(
644 self.api.groups().list, self.num_retries,
645 filters=[['group_class','=','project']])
647 for ob in all_projects:
648 objects[ob['uuid']] = ob
652 for ob in all_projects:
653 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
655 root_owners[ob['owner_uuid']] = True
657 lusers = arvados.util.list_all(
658 self.api.users().list, self.num_retries,
659 filters=[['uuid','in', list(root_owners)]])
660 lgroups = arvados.util.list_all(
661 self.api.groups().list, self.num_retries,
662 filters=[['uuid','in', list(root_owners)]])
668 objects[l["uuid"]] = l
670 objects[l["uuid"]] = l
673 for r in root_owners:
677 contents[obr["name"]] = obr
678 if "first_name" in obr:
679 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
682 if r['owner_uuid'] not in objects:
683 contents[r['name']] = r
685 # end with llfuse.lock_released, re-acquire lock
688 self.merge(contents.items(),
690 lambda a, i: a.uuid == i[1]['uuid'],
691 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
696 class FileHandle(object):
697 '''Connects a numeric file handle to a File or Directory object that has
698 been opened by the client.'''
700 def __init__(self, fh, entry):
705 class Inodes(object):
706 '''Manage the set of inodes. This is the mapping from a numeric id
707 to a concrete File or Directory object'''
711 self._counter = itertools.count(llfuse.ROOT_INODE)
713 def __getitem__(self, item):
714 return self._entries[item]
716 def __setitem__(self, key, item):
717 self._entries[key] = item
720 return self._entries.iterkeys()
723 return self._entries.items()
725 def __contains__(self, k):
726 return k in self._entries
728 def add_entry(self, entry):
729 entry.inode = next(self._counter)
730 self._entries[entry.inode] = entry
733 def del_entry(self, entry):
734 llfuse.invalidate_inode(entry.inode)
735 del self._entries[entry.inode]
737 class Operations(llfuse.Operations):
738 '''This is the main interface with llfuse. The methods on this object are
739 called by llfuse threads to service FUSE events to query and read from
742 llfuse has its own global lock which is acquired before calling a request handler,
743 so request handlers do not run concurrently unless the lock is explicitly released
744 using "with llfuse.lock_released:"'''
746 def __init__(self, uid, gid, encoding="utf-8"):
747 super(Operations, self).__init__()
749 self.inodes = Inodes()
752 self.encoding = encoding
754 # dict of inode to filehandle
755 self._filehandles = {}
756 self._filehandles_counter = 1
758 # Other threads that need to wait until the fuse driver
759 # is fully initialized should wait() on this event object.
760 self.initlock = threading.Event()
763 # Allow threads that are waiting for the driver to be finished
764 # initializing to continue
767 def access(self, inode, mode, ctx):
770 def getattr(self, inode):
771 if inode not in self.inodes:
772 raise llfuse.FUSEError(errno.ENOENT)
774 e = self.inodes[inode]
776 entry = llfuse.EntryAttributes()
779 entry.entry_timeout = 300
780 entry.attr_timeout = 300
782 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
783 if isinstance(e, Directory):
784 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
785 elif isinstance(e, StreamReaderFile):
786 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
788 entry.st_mode |= stat.S_IFREG
791 entry.st_uid = self.uid
792 entry.st_gid = self.gid
795 entry.st_size = e.size()
797 entry.st_blksize = 512
798 entry.st_blocks = (e.size()/512)+1
799 entry.st_atime = int(e.atime())
800 entry.st_mtime = int(e.mtime())
801 entry.st_ctime = int(e.mtime())
805 def lookup(self, parent_inode, name):
806 name = unicode(name, self.encoding)
807 _logger.debug("arv-mount lookup: parent_inode %i name %s",
814 if parent_inode in self.inodes:
815 p = self.inodes[parent_inode]
817 inode = p.parent_inode
818 elif isinstance(p, Directory) and name in p:
819 inode = p[name].inode
822 return self.getattr(inode)
824 raise llfuse.FUSEError(errno.ENOENT)
826 def open(self, inode, flags):
827 if inode in self.inodes:
828 p = self.inodes[inode]
830 raise llfuse.FUSEError(errno.ENOENT)
832 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
833 raise llfuse.FUSEError(errno.EROFS)
835 if isinstance(p, Directory):
836 raise llfuse.FUSEError(errno.EISDIR)
838 fh = self._filehandles_counter
839 self._filehandles_counter += 1
840 self._filehandles[fh] = FileHandle(fh, p)
843 def read(self, fh, off, size):
844 _logger.debug("arv-mount read %i %i %i", fh, off, size)
845 if fh in self._filehandles:
846 handle = self._filehandles[fh]
848 raise llfuse.FUSEError(errno.EBADF)
851 handle.entry._atime = time.time()
854 with llfuse.lock_released:
855 return handle.entry.readfrom(off, size)
856 except arvados.errors.NotFoundError as e:
857 _logger.warning("Block not found: " + str(e))
858 raise llfuse.FUSEError(errno.EIO)
861 raise llfuse.FUSEError(errno.EIO)
863 def release(self, fh):
864 if fh in self._filehandles:
865 del self._filehandles[fh]
867 def opendir(self, inode):
868 _logger.debug("arv-mount opendir: inode %i", inode)
870 if inode in self.inodes:
871 p = self.inodes[inode]
873 raise llfuse.FUSEError(errno.ENOENT)
875 if not isinstance(p, Directory):
876 raise llfuse.FUSEError(errno.ENOTDIR)
878 fh = self._filehandles_counter
879 self._filehandles_counter += 1
880 if p.parent_inode in self.inodes:
881 parent = self.inodes[p.parent_inode]
883 raise llfuse.FUSEError(errno.EIO)
886 p._atime = time.time()
888 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
891 def readdir(self, fh, off):
892 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
894 if fh in self._filehandles:
895 handle = self._filehandles[fh]
897 raise llfuse.FUSEError(errno.EBADF)
899 _logger.debug("arv-mount handle.entry %s", handle.entry)
902 while e < len(handle.entry):
903 if handle.entry[e][1].inode in self.inodes:
905 yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
906 except UnicodeEncodeError:
910 def releasedir(self, fh):
911 del self._filehandles[fh]
914 st = llfuse.StatvfsData()
915 st.f_bsize = 64 * 1024
928 # The llfuse documentation recommends only overloading functions that
929 # are actually implemented, as the default implementation will raise ENOSYS.
930 # However, there is a bug in the llfuse default implementation of create()
931 # "create() takes exactly 5 positional arguments (6 given)" which will crash
933 # The workaround is to implement it with the proper number of parameters,
934 # and then everything works out.
935 def create(self, p1, p2, p3, p4, p5):
936 raise llfuse.FUSEError(errno.EROFS)