2 # FUSE driver for Arvados Keep
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
26 _logger = logging.getLogger('arvados.arvados_fuse')
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
33 class SafeApi(object):
34 """Threadsafe wrapper for API object.
36 This stores and returns a different api object per thread, because
37 httplib2 which underlies apiclient is not threadsafe.
40 def __init__(self, config):
41 self.host = config.get('ARVADOS_API_HOST')
42 self.api_token = config.get('ARVADOS_API_TOKEN')
43 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
44 self.local = threading.local()
45 self.block_cache = arvados.KeepBlockCache()
48 if 'api' not in self.local.__dict__:
49 self.local.api = arvados.api(
51 host=self.host, token=self.api_token, insecure=self.insecure)
55 if 'keep' not in self.local.__dict__:
56 self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
57 return self.local.keep
59 def __getattr__(self, name):
60 # Proxy nonexistent attributes to the local API client.
62 return getattr(self.localapi(), name)
63 except AttributeError:
64 return super(SafeApi, self).__getattr__(name)
68 """Parse Arvados timestamp to unix time."""
72 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
73 except (TypeError, ValueError):
76 def sanitize_filename(dirty):
77 '''Replace disallowed filename characters with harmless "_".'''
87 return _disallowed_filename_characters.sub('_', dirty)
90 class FreshBase(object):
91 '''Base class for maintaining fresh/stale state to determine when to update.'''
95 self._last_update = time.time()
96 self._atime = time.time()
99 # Mark the value as stale
100 def invalidate(self):
103 # Test if the entries dict is stale.
108 return (self._last_update + self._poll_time) < self._atime
113 self._last_update = time.time()
118 class File(FreshBase):
119 '''Base for file objects.'''
121 def __init__(self, parent_inode, _mtime=0):
122 super(File, self).__init__()
124 self.parent_inode = parent_inode
130 def readfrom(self, off, size):
137 class StreamReaderFile(File):
138 '''Wraps a StreamFileReader as a file.'''
140 def __init__(self, parent_inode, reader, _mtime):
141 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
145 return self.reader.size()
147 def readfrom(self, off, size):
148 return self.reader.readfrom(off, size)
154 class StringFile(File):
155 '''Wrap a simple string as a file'''
156 def __init__(self, parent_inode, contents, _mtime):
157 super(StringFile, self).__init__(parent_inode, _mtime)
158 self.contents = contents
161 return len(self.contents)
163 def readfrom(self, off, size):
164 return self.contents[off:(off+size)]
167 class ObjectFile(StringFile):
168 '''Wrap a dict as a serialized json object.'''
170 def __init__(self, parent_inode, obj):
171 super(ObjectFile, self).__init__(parent_inode, "", 0)
172 self.uuid = obj['uuid']
175 def update(self, obj):
176 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
177 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
180 class Directory(FreshBase):
181 '''Generic directory object, backed by a dict.
182 Consists of a set of entries with the key representing the filename
183 and the value referencing a File or Directory object.
186 def __init__(self, parent_inode):
187 super(Directory, self).__init__()
189 '''parent_inode is the integer inode number'''
191 if not isinstance(parent_inode, int):
192 raise Exception("parent_inode should be an int")
193 self.parent_inode = parent_inode
195 self._mtime = time.time()
197 # Overriden by subclasses to implement logic to update the entries dict
198 # when the directory is stale
202 # Only used when computing the size of the disk footprint of the directory
207 def checkupdate(self):
211 except apiclient.errors.HttpError as e:
214 def __getitem__(self, item):
216 return self._entries[item]
220 return self._entries.items()
224 return self._entries.iterkeys()
226 def __contains__(self, k):
228 return k in self._entries
230 def merge(self, items, fn, same, new_entry):
231 '''Helper method for updating the contents of the directory. Takes a list
232 describing the new contents of the directory, reuse entries that are
233 the same in both the old and new lists, create new entries, and delete
234 old entries missing from the new list.
236 items: iterable with new directory contents
238 fn: function to take an entry in 'items' and return the desired file or
239 directory name, or None if this entry should be skipped
241 same: function to compare an existing entry (a File or Directory
242 object) with an entry in the items list to determine whether to keep
245 new_entry: function to create a new directory entry (File or Directory
246 object) from an entry in the items list.
250 oldentries = self._entries
254 name = sanitize_filename(fn(i))
256 if name in oldentries and same(oldentries[name], i):
257 # move existing directory entry over
258 self._entries[name] = oldentries[name]
261 # create new directory entry
264 self._entries[name] = self.inodes.add_entry(ent)
267 # delete any other directory entries that were not in found in 'items'
269 llfuse.invalidate_entry(self.inode, str(i))
270 self.inodes.del_entry(oldentries[i])
274 self._mtime = time.time()
279 '''Delete all entries'''
280 oldentries = self._entries
283 if isinstance(n, Directory):
285 llfuse.invalidate_entry(self.inode, str(n))
286 self.inodes.del_entry(oldentries[n])
287 llfuse.invalidate_inode(self.inode)
294 class CollectionDirectory(Directory):
295 '''Represents the root of a directory tree holding a collection.'''
297 def __init__(self, parent_inode, inodes, api, num_retries, collection):
298 super(CollectionDirectory, self).__init__(parent_inode)
301 self.num_retries = num_retries
302 self.collection_object_file = None
303 self.collection_object = None
304 if isinstance(collection, dict):
305 self.collection_locator = collection['uuid']
306 self._mtime = convertTime(collection.get('modified_at'))
308 self.collection_locator = collection
311 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
313 # Used by arv-web.py to switch the contents of the CollectionDirectory
314 def change_collection(self, new_locator):
315 """Switch the contents of the CollectionDirectory. Must be called with llfuse.lock held."""
316 self.collection_locator = new_locator
317 self.collection_object = None
320 def new_collection(self, new_collection_object, coll_reader):
321 self.collection_object = new_collection_object
323 self._mtime = convertTime(self.collection_object.get('modified_at'))
325 if self.collection_object_file is not None:
326 self.collection_object_file.update(self.collection_object)
329 for s in coll_reader.all_streams():
331 for part in s.name().split('/'):
332 if part != '' and part != '.':
333 partname = sanitize_filename(part)
334 if partname not in cwd._entries:
335 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
336 cwd = cwd._entries[partname]
337 for k, v in s.files().items():
338 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
342 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
345 if self.collection_locator is None:
349 with llfuse.lock_released:
350 coll_reader = arvados.CollectionReader(
351 self.collection_locator, self.api, self.api.localkeep(),
352 num_retries=self.num_retries)
353 new_collection_object = coll_reader.api_response() or {}
354 # If the Collection only exists in Keep, there will be no API
355 # response. Fill in the fields we need.
356 if 'uuid' not in new_collection_object:
357 new_collection_object['uuid'] = self.collection_locator
358 if "portable_data_hash" not in new_collection_object:
359 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
360 if 'manifest_text' not in new_collection_object:
361 new_collection_object['manifest_text'] = coll_reader.manifest_text()
362 coll_reader.normalize()
363 # end with llfuse.lock_released, re-acquire lock
365 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
366 self.new_collection(new_collection_object, coll_reader)
370 except arvados.errors.NotFoundError:
371 _logger.exception("arv-mount %s: error", self.collection_locator)
372 except arvados.errors.ArgumentError as detail:
373 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
374 if self.collection_object is not None and "manifest_text" in self.collection_object:
375 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
377 _logger.exception("arv-mount %s: error", self.collection_locator)
378 if self.collection_object is not None and "manifest_text" in self.collection_object:
379 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
382 def __getitem__(self, item):
384 if item == '.arvados#collection':
385 if self.collection_object_file is None:
386 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
387 self.inodes.add_entry(self.collection_object_file)
388 return self.collection_object_file
390 return super(CollectionDirectory, self).__getitem__(item)
392 def __contains__(self, k):
393 if k == '.arvados#collection':
396 return super(CollectionDirectory, self).__contains__(k)
399 class MagicDirectory(Directory):
400 '''A special directory that logically contains the set of all extant keep
401 locators. When a file is referenced by lookup(), it is tested to see if it
402 is a valid keep locator to a manifest, and if so, loads the manifest
403 contents as a subdirectory of this directory with the locator as the
404 directory name. Since querying a list of all extant keep locators is
405 impractical, only collections that have already been accessed are visible
410 This directory provides access to Arvados collections as subdirectories listed
411 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
412 the form '1234567890abcdefghijklmnopqrstuv+123').
414 Note that this directory will appear empty until you attempt to access a
415 specific collection subdirectory (such as trying to 'cd' into it), at which
416 point the collection will actually be looked up on the server and the directory
417 will appear if it exists.
420 def __init__(self, parent_inode, inodes, api, num_retries):
421 super(MagicDirectory, self).__init__(parent_inode)
424 self.num_retries = num_retries
426 def __setattr__(self, name, value):
427 super(MagicDirectory, self).__setattr__(name, value)
428 # When we're assigned an inode, add a README.
429 if ((name == 'inode') and (self.inode is not None) and
430 (not self._entries)):
431 self._entries['README'] = self.inodes.add_entry(
432 StringFile(self.inode, self.README_TEXT, time.time()))
433 # If we're the root directory, add an identical by_id subdirectory.
434 if self.inode == llfuse.ROOT_INODE:
435 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
436 self.inode, self.inodes, self.api, self.num_retries))
438 def __contains__(self, k):
439 if k in self._entries:
442 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
446 e = self.inodes.add_entry(CollectionDirectory(
447 self.inode, self.inodes, self.api, self.num_retries, k))
453 except Exception as e:
454 _logger.debug('arv-mount exception keep %s', e)
457 def __getitem__(self, item):
459 return self._entries[item]
461 raise KeyError("No collection with id " + item)
464 class RecursiveInvalidateDirectory(Directory):
465 def invalidate(self):
466 if self.inode == llfuse.ROOT_INODE:
467 llfuse.lock.acquire()
469 super(RecursiveInvalidateDirectory, self).invalidate()
470 for a in self._entries:
471 self._entries[a].invalidate()
475 if self.inode == llfuse.ROOT_INODE:
476 llfuse.lock.release()
479 class TagsDirectory(RecursiveInvalidateDirectory):
480 '''A special directory that contains as subdirectories all tags visible to the user.'''
482 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
483 super(TagsDirectory, self).__init__(parent_inode)
486 self.num_retries = num_retries
488 self._poll_time = poll_time
491 with llfuse.lock_released:
492 tags = self.api.links().list(
493 filters=[['link_class', '=', 'tag']],
494 select=['name'], distinct=True
495 ).execute(num_retries=self.num_retries)
497 self.merge(tags['items'],
499 lambda a, i: a.tag == i['name'],
500 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
503 class TagDirectory(Directory):
504 '''A special directory that contains as subdirectories all collections visible
505 to the user that are tagged with a particular tag.
508 def __init__(self, parent_inode, inodes, api, num_retries, tag,
509 poll=False, poll_time=60):
510 super(TagDirectory, self).__init__(parent_inode)
513 self.num_retries = num_retries
516 self._poll_time = poll_time
519 with llfuse.lock_released:
520 taggedcollections = self.api.links().list(
521 filters=[['link_class', '=', 'tag'],
522 ['name', '=', self.tag],
523 ['head_uuid', 'is_a', 'arvados#collection']],
525 ).execute(num_retries=self.num_retries)
526 self.merge(taggedcollections['items'],
527 lambda i: i['head_uuid'],
528 lambda a, i: a.collection_locator == i['head_uuid'],
529 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
532 class ProjectDirectory(Directory):
533 '''A special directory that contains the contents of a project.'''
535 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
536 poll=False, poll_time=60):
537 super(ProjectDirectory, self).__init__(parent_inode)
540 self.num_retries = num_retries
541 self.project_object = project_object
542 self.project_object_file = None
543 self.uuid = project_object['uuid']
545 self._poll_time = poll_time
547 def createDirectory(self, i):
548 if collection_uuid_pattern.match(i['uuid']):
549 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
550 elif group_uuid_pattern.match(i['uuid']):
551 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
552 elif link_uuid_pattern.match(i['uuid']):
553 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
554 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
557 elif uuid_pattern.match(i['uuid']):
558 return ObjectFile(self.parent_inode, i)
563 if self.project_object_file == None:
564 self.project_object_file = ObjectFile(self.inode, self.project_object)
565 self.inodes.add_entry(self.project_object_file)
569 if i['name'] is None or len(i['name']) == 0:
571 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
572 # collection or subproject
574 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
577 elif 'kind' in i and i['kind'].startswith('arvados#'):
579 return "{}.{}".format(i['name'], i['kind'][8:])
584 if isinstance(a, CollectionDirectory):
585 return a.collection_locator == i['uuid']
586 elif isinstance(a, ProjectDirectory):
587 return a.uuid == i['uuid']
588 elif isinstance(a, ObjectFile):
589 return a.uuid == i['uuid'] and not a.stale()
592 with llfuse.lock_released:
593 if group_uuid_pattern.match(self.uuid):
594 self.project_object = self.api.groups().get(
595 uuid=self.uuid).execute(num_retries=self.num_retries)
596 elif user_uuid_pattern.match(self.uuid):
597 self.project_object = self.api.users().get(
598 uuid=self.uuid).execute(num_retries=self.num_retries)
600 contents = arvados.util.list_all(self.api.groups().contents,
601 self.num_retries, uuid=self.uuid)
602 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
603 contents += arvados.util.list_all(
604 self.api.links().list, self.num_retries,
605 filters=[['tail_uuid', '=', self.uuid],
606 ['link_class', '=', 'name']])
608 # end with llfuse.lock_released, re-acquire lock
613 self.createDirectory)
615 def __getitem__(self, item):
617 if item == '.arvados#project':
618 return self.project_object_file
620 return super(ProjectDirectory, self).__getitem__(item)
622 def __contains__(self, k):
623 if k == '.arvados#project':
626 return super(ProjectDirectory, self).__contains__(k)
629 class SharedDirectory(Directory):
630 '''A special directory that represents users or groups who have shared projects with me.'''
632 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
633 poll=False, poll_time=60):
634 super(SharedDirectory, self).__init__(parent_inode)
637 self.num_retries = num_retries
638 self.current_user = api.users().current().execute(num_retries=num_retries)
640 self._poll_time = poll_time
643 with llfuse.lock_released:
644 all_projects = arvados.util.list_all(
645 self.api.groups().list, self.num_retries,
646 filters=[['group_class','=','project']])
648 for ob in all_projects:
649 objects[ob['uuid']] = ob
653 for ob in all_projects:
654 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
656 root_owners[ob['owner_uuid']] = True
658 lusers = arvados.util.list_all(
659 self.api.users().list, self.num_retries,
660 filters=[['uuid','in', list(root_owners)]])
661 lgroups = arvados.util.list_all(
662 self.api.groups().list, self.num_retries,
663 filters=[['uuid','in', list(root_owners)]])
669 objects[l["uuid"]] = l
671 objects[l["uuid"]] = l
674 for r in root_owners:
678 contents[obr["name"]] = obr
679 if "first_name" in obr:
680 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
683 if r['owner_uuid'] not in objects:
684 contents[r['name']] = r
686 # end with llfuse.lock_released, re-acquire lock
689 self.merge(contents.items(),
691 lambda a, i: a.uuid == i[1]['uuid'],
692 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
697 class FileHandle(object):
698 '''Connects a numeric file handle to a File or Directory object that has
699 been opened by the client.'''
701 def __init__(self, fh, entry):
706 class Inodes(object):
707 '''Manage the set of inodes. This is the mapping from a numeric id
708 to a concrete File or Directory object'''
712 self._counter = itertools.count(llfuse.ROOT_INODE)
714 def __getitem__(self, item):
715 return self._entries[item]
717 def __setitem__(self, key, item):
718 self._entries[key] = item
721 return self._entries.iterkeys()
724 return self._entries.items()
726 def __contains__(self, k):
727 return k in self._entries
729 def add_entry(self, entry):
730 entry.inode = next(self._counter)
731 self._entries[entry.inode] = entry
734 def del_entry(self, entry):
735 llfuse.invalidate_inode(entry.inode)
736 del self._entries[entry.inode]
738 class Operations(llfuse.Operations):
739 '''This is the main interface with llfuse. The methods on this object are
740 called by llfuse threads to service FUSE events to query and read from
743 llfuse has its own global lock which is acquired before calling a request handler,
744 so request handlers do not run concurrently unless the lock is explicitly released
745 using "with llfuse.lock_released:"'''
747 def __init__(self, uid, gid, encoding="utf-8"):
748 super(Operations, self).__init__()
750 self.inodes = Inodes()
753 self.encoding = encoding
755 # dict of inode to filehandle
756 self._filehandles = {}
757 self._filehandles_counter = 1
759 # Other threads that need to wait until the fuse driver
760 # is fully initialized should wait() on this event object.
761 self.initlock = threading.Event()
764 # Allow threads that are waiting for the driver to be finished
765 # initializing to continue
768 def access(self, inode, mode, ctx):
771 def getattr(self, inode):
772 if inode not in self.inodes:
773 raise llfuse.FUSEError(errno.ENOENT)
775 e = self.inodes[inode]
777 entry = llfuse.EntryAttributes()
780 entry.entry_timeout = 300
781 entry.attr_timeout = 300
783 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
784 if isinstance(e, Directory):
785 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
786 elif isinstance(e, StreamReaderFile):
787 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
789 entry.st_mode |= stat.S_IFREG
792 entry.st_uid = self.uid
793 entry.st_gid = self.gid
796 entry.st_size = e.size()
798 entry.st_blksize = 512
799 entry.st_blocks = (e.size()/512)+1
800 entry.st_atime = int(e.atime())
801 entry.st_mtime = int(e.mtime())
802 entry.st_ctime = int(e.mtime())
806 def lookup(self, parent_inode, name):
807 name = unicode(name, self.encoding)
808 _logger.debug("arv-mount lookup: parent_inode %i name %s",
815 if parent_inode in self.inodes:
816 p = self.inodes[parent_inode]
818 inode = p.parent_inode
819 elif isinstance(p, Directory) and name in p:
820 inode = p[name].inode
823 return self.getattr(inode)
825 raise llfuse.FUSEError(errno.ENOENT)
827 def open(self, inode, flags):
828 if inode in self.inodes:
829 p = self.inodes[inode]
831 raise llfuse.FUSEError(errno.ENOENT)
833 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
834 raise llfuse.FUSEError(errno.EROFS)
836 if isinstance(p, Directory):
837 raise llfuse.FUSEError(errno.EISDIR)
839 fh = self._filehandles_counter
840 self._filehandles_counter += 1
841 self._filehandles[fh] = FileHandle(fh, p)
844 def read(self, fh, off, size):
845 _logger.debug("arv-mount read %i %i %i", fh, off, size)
846 if fh in self._filehandles:
847 handle = self._filehandles[fh]
849 raise llfuse.FUSEError(errno.EBADF)
852 handle.entry._atime = time.time()
855 with llfuse.lock_released:
856 return handle.entry.readfrom(off, size)
857 except arvados.errors.NotFoundError as e:
858 _logger.warning("Block not found: " + str(e))
859 raise llfuse.FUSEError(errno.EIO)
862 raise llfuse.FUSEError(errno.EIO)
864 def release(self, fh):
865 if fh in self._filehandles:
866 del self._filehandles[fh]
868 def opendir(self, inode):
869 _logger.debug("arv-mount opendir: inode %i", inode)
871 if inode in self.inodes:
872 p = self.inodes[inode]
874 raise llfuse.FUSEError(errno.ENOENT)
876 if not isinstance(p, Directory):
877 raise llfuse.FUSEError(errno.ENOTDIR)
879 fh = self._filehandles_counter
880 self._filehandles_counter += 1
881 if p.parent_inode in self.inodes:
882 parent = self.inodes[p.parent_inode]
884 raise llfuse.FUSEError(errno.EIO)
887 p._atime = time.time()
889 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
892 def readdir(self, fh, off):
893 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
895 if fh in self._filehandles:
896 handle = self._filehandles[fh]
898 raise llfuse.FUSEError(errno.EBADF)
900 _logger.debug("arv-mount handle.entry %s", handle.entry)
903 while e < len(handle.entry):
904 if handle.entry[e][1].inode in self.inodes:
906 yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
907 except UnicodeEncodeError:
911 def releasedir(self, fh):
912 del self._filehandles[fh]
915 st = llfuse.StatvfsData()
916 st.f_bsize = 64 * 1024
929 # The llfuse documentation recommends only overloading functions that
930 # are actually implemented, as the default implementation will raise ENOSYS.
931 # However, there is a bug in the llfuse default implementation of create()
932 # "create() takes exactly 5 positional arguments (6 given)" which will crash
934 # The workaround is to implement it with the proper number of parameters,
935 # and then everything works out.
936 def create(self, p1, p2, p3, p4, p5):
937 raise llfuse.FUSEError(errno.EROFS)