2 # FUSE driver for Arvados Keep
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
26 _logger = logging.getLogger('arvados.arvados_fuse')
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
33 class SafeApi(object):
34 """Threadsafe wrapper for API object.
36 This stores and returns a different api object per thread, because
37 httplib2 which underlies apiclient is not threadsafe.
40 def __init__(self, config):
41 self.host = config.get('ARVADOS_API_HOST')
42 self.api_token = config.get('ARVADOS_API_TOKEN')
43 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
44 self.local = threading.local()
45 self.block_cache = arvados.KeepBlockCache()
48 if 'api' not in self.local.__dict__:
49 self.local.api = arvados.api(
51 host=self.host, token=self.api_token, insecure=self.insecure)
55 if 'keep' not in self.local.__dict__:
56 self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
57 return self.local.keep
59 def __getattr__(self, name):
60 # Proxy nonexistent attributes to the local API client.
62 return getattr(self.localapi(), name)
63 except AttributeError:
64 return super(SafeApi, self).__getattr__(name)
68 '''Parse Arvados timestamp to unix time.'''
70 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
71 except (TypeError, ValueError):
74 def sanitize_filename(dirty):
75 '''Replace disallowed filename characters with harmless "_".'''
85 return _disallowed_filename_characters.sub('_', dirty)
88 class FreshBase(object):
89 '''Base class for maintaining fresh/stale state to determine when to update.'''
93 self._last_update = time.time()
94 self._atime = time.time()
97 # Mark the value as stale
101 # Test if the entries dict is stale.
106 return (self._last_update + self._poll_time) < self._atime
111 self._last_update = time.time()
116 class File(FreshBase):
117 '''Base for file objects.'''
119 def __init__(self, parent_inode, _mtime=0):
120 super(File, self).__init__()
122 self.parent_inode = parent_inode
128 def readfrom(self, off, size):
135 class StreamReaderFile(File):
136 '''Wraps a StreamFileReader as a file.'''
138 def __init__(self, parent_inode, reader, _mtime):
139 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
143 return self.reader.size()
145 def readfrom(self, off, size):
146 return self.reader.readfrom(off, size)
152 class StringFile(File):
153 '''Wrap a simple string as a file'''
154 def __init__(self, parent_inode, contents, _mtime):
155 super(StringFile, self).__init__(parent_inode, _mtime)
156 self.contents = contents
159 return len(self.contents)
161 def readfrom(self, off, size):
162 return self.contents[off:(off+size)]
165 class ObjectFile(StringFile):
166 '''Wrap a dict as a serialized json object.'''
168 def __init__(self, parent_inode, obj):
169 super(ObjectFile, self).__init__(parent_inode, "", 0)
170 self.uuid = obj['uuid']
173 def update(self, obj):
174 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
175 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
178 class Directory(FreshBase):
179 '''Generic directory object, backed by a dict.
180 Consists of a set of entries with the key representing the filename
181 and the value referencing a File or Directory object.
184 def __init__(self, parent_inode):
185 super(Directory, self).__init__()
187 '''parent_inode is the integer inode number'''
189 if not isinstance(parent_inode, int):
190 raise Exception("parent_inode should be an int")
191 self.parent_inode = parent_inode
193 self._mtime = time.time()
195 # Overriden by subclasses to implement logic to update the entries dict
196 # when the directory is stale
200 # Only used when computing the size of the disk footprint of the directory
205 def checkupdate(self):
209 except apiclient.errors.HttpError as e:
212 def __getitem__(self, item):
214 return self._entries[item]
218 return self._entries.items()
222 return self._entries.iterkeys()
224 def __contains__(self, k):
226 return k in self._entries
228 def merge(self, items, fn, same, new_entry):
229 '''Helper method for updating the contents of the directory. Takes a list
230 describing the new contents of the directory, reuse entries that are
231 the same in both the old and new lists, create new entries, and delete
232 old entries missing from the new list.
234 items: iterable with new directory contents
236 fn: function to take an entry in 'items' and return the desired file or
237 directory name, or None if this entry should be skipped
239 same: function to compare an existing entry (a File or Directory
240 object) with an entry in the items list to determine whether to keep
243 new_entry: function to create a new directory entry (File or Directory
244 object) from an entry in the items list.
248 oldentries = self._entries
252 name = sanitize_filename(fn(i))
254 if name in oldentries and same(oldentries[name], i):
255 # move existing directory entry over
256 self._entries[name] = oldentries[name]
259 # create new directory entry
262 self._entries[name] = self.inodes.add_entry(ent)
265 # delete any other directory entries that were not in found in 'items'
267 llfuse.invalidate_entry(self.inode, str(i))
268 self.inodes.del_entry(oldentries[i])
272 self._mtime = time.time()
277 '''Delete all entries'''
278 oldentries = self._entries
281 if isinstance(n, Directory):
283 llfuse.invalidate_entry(self.inode, str(n))
284 self.inodes.del_entry(oldentries[n])
285 llfuse.invalidate_inode(self.inode)
292 class CollectionDirectory(Directory):
293 '''Represents the root of a directory tree holding a collection.'''
295 def __init__(self, parent_inode, inodes, api, num_retries, collection):
296 super(CollectionDirectory, self).__init__(parent_inode)
299 self.num_retries = num_retries
300 self.collection_object_file = None
301 self.collection_object = None
302 if isinstance(collection, dict):
303 self.collection_locator = collection['uuid']
305 self.collection_locator = collection
308 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
310 # Used by arv-web.py to switch the contents of the CollectionDirectory
311 def change_collection(self, new_locator):
312 """Switch the contents of the CollectionDirectory. Must be called with llfuse.lock held."""
313 self.collection_locator = new_locator
314 self.collection_object = None
317 def new_collection(self, new_collection_object, coll_reader):
318 self.collection_object = new_collection_object
320 if self.collection_object_file is not None:
321 self.collection_object_file.update(self.collection_object)
324 for s in coll_reader.all_streams():
326 for part in s.name().split('/'):
327 if part != '' and part != '.':
328 partname = sanitize_filename(part)
329 if partname not in cwd._entries:
330 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
331 cwd = cwd._entries[partname]
332 for k, v in s.files().items():
333 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
337 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
340 if self.collection_locator is None:
344 with llfuse.lock_released:
345 coll_reader = arvados.CollectionReader(
346 self.collection_locator, self.api, self.api.localkeep(),
347 num_retries=self.num_retries)
348 new_collection_object = coll_reader.api_response() or {}
349 # If the Collection only exists in Keep, there will be no API
350 # response. Fill in the fields we need.
351 if 'uuid' not in new_collection_object:
352 new_collection_object['uuid'] = self.collection_locator
353 if "portable_data_hash" not in new_collection_object:
354 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
355 if 'manifest_text' not in new_collection_object:
356 new_collection_object['manifest_text'] = coll_reader.manifest_text()
357 coll_reader.normalize()
358 # end with llfuse.lock_released, re-acquire lock
360 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
361 self.new_collection(new_collection_object, coll_reader)
365 except arvados.errors.NotFoundError:
366 _logger.exception("arv-mount %s: error", self.collection_locator)
367 except arvados.errors.ArgumentError as detail:
368 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
369 if self.collection_object is not None and "manifest_text" in self.collection_object:
370 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
372 _logger.exception("arv-mount %s: error", self.collection_locator)
373 if self.collection_object is not None and "manifest_text" in self.collection_object:
374 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
377 def __getitem__(self, item):
379 if item == '.arvados#collection':
380 if self.collection_object_file is None:
381 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
382 self.inodes.add_entry(self.collection_object_file)
383 return self.collection_object_file
385 return super(CollectionDirectory, self).__getitem__(item)
387 def __contains__(self, k):
388 if k == '.arvados#collection':
391 return super(CollectionDirectory, self).__contains__(k)
394 return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
397 class MagicDirectory(Directory):
398 '''A special directory that logically contains the set of all extant keep
399 locators. When a file is referenced by lookup(), it is tested to see if it
400 is a valid keep locator to a manifest, and if so, loads the manifest
401 contents as a subdirectory of this directory with the locator as the
402 directory name. Since querying a list of all extant keep locators is
403 impractical, only collections that have already been accessed are visible
408 This directory provides access to Arvados collections as subdirectories listed
409 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
410 the form '1234567890abcdefghijklmnopqrstuv+123').
412 Note that this directory will appear empty until you attempt to access a
413 specific collection subdirectory (such as trying to 'cd' into it), at which
414 point the collection will actually be looked up on the server and the directory
415 will appear if it exists.
418 def __init__(self, parent_inode, inodes, api, num_retries):
419 super(MagicDirectory, self).__init__(parent_inode)
422 self.num_retries = num_retries
424 def __setattr__(self, name, value):
425 super(MagicDirectory, self).__setattr__(name, value)
426 # When we're assigned an inode, add a README.
427 if ((name == 'inode') and (self.inode is not None) and
428 (not self._entries)):
429 self._entries['README'] = self.inodes.add_entry(
430 StringFile(self.inode, self.README_TEXT, time.time()))
431 # If we're the root directory, add an identical by_id subdirectory.
432 if self.inode == llfuse.ROOT_INODE:
433 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
434 self.inode, self.inodes, self.api, self.num_retries))
436 def __contains__(self, k):
437 if k in self._entries:
440 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
444 e = self.inodes.add_entry(CollectionDirectory(
445 self.inode, self.inodes, self.api, self.num_retries, k))
451 except Exception as e:
452 _logger.debug('arv-mount exception keep %s', e)
455 def __getitem__(self, item):
457 return self._entries[item]
459 raise KeyError("No collection with id " + item)
462 class RecursiveInvalidateDirectory(Directory):
463 def invalidate(self):
464 if self.inode == llfuse.ROOT_INODE:
465 llfuse.lock.acquire()
467 super(RecursiveInvalidateDirectory, self).invalidate()
468 for a in self._entries:
469 self._entries[a].invalidate()
473 if self.inode == llfuse.ROOT_INODE:
474 llfuse.lock.release()
477 class TagsDirectory(RecursiveInvalidateDirectory):
478 '''A special directory that contains as subdirectories all tags visible to the user.'''
480 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
481 super(TagsDirectory, self).__init__(parent_inode)
484 self.num_retries = num_retries
486 self._poll_time = poll_time
489 with llfuse.lock_released:
490 tags = self.api.links().list(
491 filters=[['link_class', '=', 'tag']],
492 select=['name'], distinct=True
493 ).execute(num_retries=self.num_retries)
495 self.merge(tags['items'],
497 lambda a, i: a.tag == i['name'],
498 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
501 class TagDirectory(Directory):
502 '''A special directory that contains as subdirectories all collections visible
503 to the user that are tagged with a particular tag.
506 def __init__(self, parent_inode, inodes, api, num_retries, tag,
507 poll=False, poll_time=60):
508 super(TagDirectory, self).__init__(parent_inode)
511 self.num_retries = num_retries
514 self._poll_time = poll_time
517 with llfuse.lock_released:
518 taggedcollections = self.api.links().list(
519 filters=[['link_class', '=', 'tag'],
520 ['name', '=', self.tag],
521 ['head_uuid', 'is_a', 'arvados#collection']],
523 ).execute(num_retries=self.num_retries)
524 self.merge(taggedcollections['items'],
525 lambda i: i['head_uuid'],
526 lambda a, i: a.collection_locator == i['head_uuid'],
527 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
530 class ProjectDirectory(Directory):
531 '''A special directory that contains the contents of a project.'''
533 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
534 poll=False, poll_time=60):
535 super(ProjectDirectory, self).__init__(parent_inode)
538 self.num_retries = num_retries
539 self.project_object = project_object
540 self.project_object_file = None
541 self.uuid = project_object['uuid']
543 self._poll_time = poll_time
545 def createDirectory(self, i):
546 if collection_uuid_pattern.match(i['uuid']):
547 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
548 elif group_uuid_pattern.match(i['uuid']):
549 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
550 elif link_uuid_pattern.match(i['uuid']):
551 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
552 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
555 elif uuid_pattern.match(i['uuid']):
556 return ObjectFile(self.parent_inode, i)
561 if self.project_object_file == None:
562 self.project_object_file = ObjectFile(self.inode, self.project_object)
563 self.inodes.add_entry(self.project_object_file)
567 if i['name'] is None or len(i['name']) == 0:
569 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
570 # collection or subproject
572 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
575 elif 'kind' in i and i['kind'].startswith('arvados#'):
577 return "{}.{}".format(i['name'], i['kind'][8:])
582 if isinstance(a, CollectionDirectory):
583 return a.collection_locator == i['uuid']
584 elif isinstance(a, ProjectDirectory):
585 return a.uuid == i['uuid']
586 elif isinstance(a, ObjectFile):
587 return a.uuid == i['uuid'] and not a.stale()
590 with llfuse.lock_released:
591 if group_uuid_pattern.match(self.uuid):
592 self.project_object = self.api.groups().get(
593 uuid=self.uuid).execute(num_retries=self.num_retries)
594 elif user_uuid_pattern.match(self.uuid):
595 self.project_object = self.api.users().get(
596 uuid=self.uuid).execute(num_retries=self.num_retries)
598 contents = arvados.util.list_all(self.api.groups().contents,
599 self.num_retries, uuid=self.uuid)
600 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
601 contents += arvados.util.list_all(
602 self.api.links().list, self.num_retries,
603 filters=[['tail_uuid', '=', self.uuid],
604 ['link_class', '=', 'name']])
606 # end with llfuse.lock_released, re-acquire lock
611 self.createDirectory)
613 def __getitem__(self, item):
615 if item == '.arvados#project':
616 return self.project_object_file
618 return super(ProjectDirectory, self).__getitem__(item)
620 def __contains__(self, k):
621 if k == '.arvados#project':
624 return super(ProjectDirectory, self).__contains__(k)
627 class SharedDirectory(Directory):
628 '''A special directory that represents users or groups who have shared projects with me.'''
630 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
631 poll=False, poll_time=60):
632 super(SharedDirectory, self).__init__(parent_inode)
635 self.num_retries = num_retries
636 self.current_user = api.users().current().execute(num_retries=num_retries)
638 self._poll_time = poll_time
641 with llfuse.lock_released:
642 all_projects = arvados.util.list_all(
643 self.api.groups().list, self.num_retries,
644 filters=[['group_class','=','project']])
646 for ob in all_projects:
647 objects[ob['uuid']] = ob
651 for ob in all_projects:
652 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
654 root_owners[ob['owner_uuid']] = True
656 lusers = arvados.util.list_all(
657 self.api.users().list, self.num_retries,
658 filters=[['uuid','in', list(root_owners)]])
659 lgroups = arvados.util.list_all(
660 self.api.groups().list, self.num_retries,
661 filters=[['uuid','in', list(root_owners)]])
667 objects[l["uuid"]] = l
669 objects[l["uuid"]] = l
672 for r in root_owners:
676 contents[obr["name"]] = obr
677 if "first_name" in obr:
678 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
681 if r['owner_uuid'] not in objects:
682 contents[r['name']] = r
684 # end with llfuse.lock_released, re-acquire lock
687 self.merge(contents.items(),
689 lambda a, i: a.uuid == i[1]['uuid'],
690 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
695 class FileHandle(object):
696 '''Connects a numeric file handle to a File or Directory object that has
697 been opened by the client.'''
699 def __init__(self, fh, entry):
704 class Inodes(object):
705 '''Manage the set of inodes. This is the mapping from a numeric id
706 to a concrete File or Directory object'''
710 self._counter = itertools.count(llfuse.ROOT_INODE)
712 def __getitem__(self, item):
713 return self._entries[item]
715 def __setitem__(self, key, item):
716 self._entries[key] = item
719 return self._entries.iterkeys()
722 return self._entries.items()
724 def __contains__(self, k):
725 return k in self._entries
727 def add_entry(self, entry):
728 entry.inode = next(self._counter)
729 self._entries[entry.inode] = entry
732 def del_entry(self, entry):
733 llfuse.invalidate_inode(entry.inode)
734 del self._entries[entry.inode]
736 class Operations(llfuse.Operations):
737 '''This is the main interface with llfuse. The methods on this object are
738 called by llfuse threads to service FUSE events to query and read from
741 llfuse has its own global lock which is acquired before calling a request handler,
742 so request handlers do not run concurrently unless the lock is explicitly released
743 using "with llfuse.lock_released:"'''
745 def __init__(self, uid, gid, encoding="utf-8"):
746 super(Operations, self).__init__()
748 self.inodes = Inodes()
751 self.encoding = encoding
753 # dict of inode to filehandle
754 self._filehandles = {}
755 self._filehandles_counter = 1
757 # Other threads that need to wait until the fuse driver
758 # is fully initialized should wait() on this event object.
759 self.initlock = threading.Event()
762 # Allow threads that are waiting for the driver to be finished
763 # initializing to continue
766 def access(self, inode, mode, ctx):
769 def getattr(self, inode):
770 if inode not in self.inodes:
771 raise llfuse.FUSEError(errno.ENOENT)
773 e = self.inodes[inode]
775 entry = llfuse.EntryAttributes()
778 entry.entry_timeout = 300
779 entry.attr_timeout = 300
781 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
782 if isinstance(e, Directory):
783 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
784 elif isinstance(e, StreamReaderFile):
785 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
787 entry.st_mode |= stat.S_IFREG
790 entry.st_uid = self.uid
791 entry.st_gid = self.gid
794 entry.st_size = e.size()
796 entry.st_blksize = 512
797 entry.st_blocks = (e.size()/512)+1
798 entry.st_atime = int(e.atime())
799 entry.st_mtime = int(e.mtime())
800 entry.st_ctime = int(e.mtime())
804 def lookup(self, parent_inode, name):
805 name = unicode(name, self.encoding)
806 _logger.debug("arv-mount lookup: parent_inode %i name %s",
813 if parent_inode in self.inodes:
814 p = self.inodes[parent_inode]
816 inode = p.parent_inode
817 elif isinstance(p, Directory) and name in p:
818 inode = p[name].inode
821 return self.getattr(inode)
823 raise llfuse.FUSEError(errno.ENOENT)
825 def open(self, inode, flags):
826 if inode in self.inodes:
827 p = self.inodes[inode]
829 raise llfuse.FUSEError(errno.ENOENT)
831 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
832 raise llfuse.FUSEError(errno.EROFS)
834 if isinstance(p, Directory):
835 raise llfuse.FUSEError(errno.EISDIR)
837 fh = self._filehandles_counter
838 self._filehandles_counter += 1
839 self._filehandles[fh] = FileHandle(fh, p)
842 def read(self, fh, off, size):
843 _logger.debug("arv-mount read %i %i %i", fh, off, size)
844 if fh in self._filehandles:
845 handle = self._filehandles[fh]
847 raise llfuse.FUSEError(errno.EBADF)
850 handle.entry._atime = time.time()
853 with llfuse.lock_released:
854 return handle.entry.readfrom(off, size)
855 except arvados.errors.NotFoundError as e:
856 _logger.warning("Block not found: " + str(e))
857 raise llfuse.FUSEError(errno.EIO)
860 raise llfuse.FUSEError(errno.EIO)
862 def release(self, fh):
863 if fh in self._filehandles:
864 del self._filehandles[fh]
866 def opendir(self, inode):
867 _logger.debug("arv-mount opendir: inode %i", inode)
869 if inode in self.inodes:
870 p = self.inodes[inode]
872 raise llfuse.FUSEError(errno.ENOENT)
874 if not isinstance(p, Directory):
875 raise llfuse.FUSEError(errno.ENOTDIR)
877 fh = self._filehandles_counter
878 self._filehandles_counter += 1
879 if p.parent_inode in self.inodes:
880 parent = self.inodes[p.parent_inode]
882 raise llfuse.FUSEError(errno.EIO)
885 p._atime = time.time()
887 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
890 def readdir(self, fh, off):
891 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
893 if fh in self._filehandles:
894 handle = self._filehandles[fh]
896 raise llfuse.FUSEError(errno.EBADF)
898 _logger.debug("arv-mount handle.entry %s", handle.entry)
901 while e < len(handle.entry):
902 if handle.entry[e][1].inode in self.inodes:
904 yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
905 except UnicodeEncodeError:
909 def releasedir(self, fh):
910 del self._filehandles[fh]
913 st = llfuse.StatvfsData()
914 st.f_bsize = 64 * 1024
927 # The llfuse documentation recommends only overloading functions that
928 # are actually implemented, as the default implementation will raise ENOSYS.
929 # However, there is a bug in the llfuse default implementation of create()
930 # "create() takes exactly 5 positional arguments (6 given)" which will crash
932 # The workaround is to implement it with the proper number of parameters,
933 # and then everything works out.
934 def create(self, p1, p2, p3, p4, p5):
935 raise llfuse.FUSEError(errno.EROFS)