2 # FUSE driver for Arvados Keep
25 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
27 _logger = logging.getLogger('arvados.arvados_fuse')
29 # Match any character which FUSE or Linux cannot accommodate as part
30 # of a filename. (If present in a collection filename, they will
31 # appear as underscores in the fuse mount.)
32 _disallowed_filename_characters = re.compile('[\x00/]')
35 """Parse Arvados timestamp to unix time."""
39 return calendar.timegm(ciso8601.parse_datetime_unaware(t).timetuple())
40 except (TypeError, ValueError):
43 def sanitize_filename(dirty):
44 '''Replace disallowed filename characters with harmless "_".'''
54 return _disallowed_filename_characters.sub('_', dirty)
57 class FreshBase(object):
58 '''Base class for maintaining fresh/stale state to determine when to update.'''
62 self._last_update = time.time()
63 self._atime = time.time()
66 # Mark the value as stale
70 # Test if the entries dict is stale.
75 return (self._last_update + self._poll_time) < self._atime
80 self._last_update = time.time()
85 class File(FreshBase):
86 '''Base for file objects.'''
88 def __init__(self, parent_inode, _mtime=0):
89 super(File, self).__init__()
91 self.parent_inode = parent_inode
97 def readfrom(self, off, size):
104 class StreamReaderFile(File):
105 '''Wraps a StreamFileReader as a file.'''
107 def __init__(self, parent_inode, reader, _mtime):
108 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
112 return self.reader.size()
114 def readfrom(self, off, size):
115 return self.reader.readfrom(off, size)
121 class StringFile(File):
122 '''Wrap a simple string as a file'''
123 def __init__(self, parent_inode, contents, _mtime):
124 super(StringFile, self).__init__(parent_inode, _mtime)
125 self.contents = contents
128 return len(self.contents)
130 def readfrom(self, off, size):
131 return self.contents[off:(off+size)]
134 class ObjectFile(StringFile):
135 '''Wrap a dict as a serialized json object.'''
137 def __init__(self, parent_inode, obj):
138 super(ObjectFile, self).__init__(parent_inode, "", 0)
139 self.uuid = obj['uuid']
142 def update(self, obj):
143 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
144 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
147 class Directory(FreshBase):
148 '''Generic directory object, backed by a dict.
149 Consists of a set of entries with the key representing the filename
150 and the value referencing a File or Directory object.
153 def __init__(self, parent_inode):
154 super(Directory, self).__init__()
156 '''parent_inode is the integer inode number'''
158 if not isinstance(parent_inode, int):
159 raise Exception("parent_inode should be an int")
160 self.parent_inode = parent_inode
162 self._mtime = time.time()
164 # Overriden by subclasses to implement logic to update the entries dict
165 # when the directory is stale
169 # Only used when computing the size of the disk footprint of the directory
174 def checkupdate(self):
178 except apiclient.errors.HttpError as e:
181 def __getitem__(self, item):
183 return self._entries[item]
187 return self._entries.items()
191 return self._entries.iterkeys()
193 def __contains__(self, k):
195 return k in self._entries
197 def merge(self, items, fn, same, new_entry):
198 '''Helper method for updating the contents of the directory. Takes a list
199 describing the new contents of the directory, reuse entries that are
200 the same in both the old and new lists, create new entries, and delete
201 old entries missing from the new list.
203 items: iterable with new directory contents
205 fn: function to take an entry in 'items' and return the desired file or
206 directory name, or None if this entry should be skipped
208 same: function to compare an existing entry (a File or Directory
209 object) with an entry in the items list to determine whether to keep
212 new_entry: function to create a new directory entry (File or Directory
213 object) from an entry in the items list.
217 oldentries = self._entries
221 name = sanitize_filename(fn(i))
223 if name in oldentries and same(oldentries[name], i):
224 # move existing directory entry over
225 self._entries[name] = oldentries[name]
228 # create new directory entry
231 self._entries[name] = self.inodes.add_entry(ent)
234 # delete any other directory entries that were not in found in 'items'
236 llfuse.invalidate_entry(self.inode, str(i))
237 self.inodes.del_entry(oldentries[i])
241 self._mtime = time.time()
246 '''Delete all entries'''
247 oldentries = self._entries
250 if isinstance(n, Directory):
252 llfuse.invalidate_entry(self.inode, str(n))
253 self.inodes.del_entry(oldentries[n])
254 llfuse.invalidate_inode(self.inode)
261 class CollectionDirectory(Directory):
262 '''Represents the root of a directory tree holding a collection.'''
264 def __init__(self, parent_inode, inodes, api, num_retries, collection):
265 super(CollectionDirectory, self).__init__(parent_inode)
268 self.num_retries = num_retries
269 self.collection_object_file = None
270 self.collection_object = None
271 if isinstance(collection, dict):
272 self.collection_locator = collection['uuid']
273 self._mtime = convertTime(collection.get('modified_at'))
275 self.collection_locator = collection
279 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
281 # Used by arv-web.py to switch the contents of the CollectionDirectory
282 def change_collection(self, new_locator):
283 """Switch the contents of the CollectionDirectory. Must be called with llfuse.lock held."""
284 self.collection_locator = new_locator
285 self.collection_object = None
288 def new_collection(self, new_collection_object, coll_reader):
289 self.collection_object = new_collection_object
291 self._mtime = convertTime(self.collection_object.get('modified_at'))
293 if self.collection_object_file is not None:
294 self.collection_object_file.update(self.collection_object)
297 for s in coll_reader.all_streams():
299 for part in s.name().split('/'):
300 if part != '' and part != '.':
301 partname = sanitize_filename(part)
302 if partname not in cwd._entries:
303 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
304 cwd = cwd._entries[partname]
305 for k, v in s.files().items():
306 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
310 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
313 if self.collection_locator is None:
317 with llfuse.lock_released:
318 coll_reader = arvados.CollectionReader(
319 self.collection_locator, self.api, self.api.keep,
320 num_retries=self.num_retries)
321 new_collection_object = coll_reader.api_response() or {}
322 # If the Collection only exists in Keep, there will be no API
323 # response. Fill in the fields we need.
324 if 'uuid' not in new_collection_object:
325 new_collection_object['uuid'] = self.collection_locator
326 if "portable_data_hash" not in new_collection_object:
327 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
328 if 'manifest_text' not in new_collection_object:
329 new_collection_object['manifest_text'] = coll_reader.manifest_text()
330 coll_reader.normalize()
331 # end with llfuse.lock_released, re-acquire lock
333 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
334 self.new_collection(new_collection_object, coll_reader)
338 except arvados.errors.NotFoundError:
339 _logger.exception("arv-mount %s: error", self.collection_locator)
340 except arvados.errors.ArgumentError as detail:
341 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
342 if self.collection_object is not None and "manifest_text" in self.collection_object:
343 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
345 _logger.exception("arv-mount %s: error", self.collection_locator)
346 if self.collection_object is not None and "manifest_text" in self.collection_object:
347 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
350 def __getitem__(self, item):
352 if item == '.arvados#collection':
353 if self.collection_object_file is None:
354 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
355 self.inodes.add_entry(self.collection_object_file)
356 return self.collection_object_file
358 return super(CollectionDirectory, self).__getitem__(item)
360 def __contains__(self, k):
361 if k == '.arvados#collection':
364 return super(CollectionDirectory, self).__contains__(k)
367 class MagicDirectory(Directory):
368 '''A special directory that logically contains the set of all extant keep
369 locators. When a file is referenced by lookup(), it is tested to see if it
370 is a valid keep locator to a manifest, and if so, loads the manifest
371 contents as a subdirectory of this directory with the locator as the
372 directory name. Since querying a list of all extant keep locators is
373 impractical, only collections that have already been accessed are visible
378 This directory provides access to Arvados collections as subdirectories listed
379 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
380 the form '1234567890abcdefghijklmnopqrstuv+123').
382 Note that this directory will appear empty until you attempt to access a
383 specific collection subdirectory (such as trying to 'cd' into it), at which
384 point the collection will actually be looked up on the server and the directory
385 will appear if it exists.
388 def __init__(self, parent_inode, inodes, api, num_retries):
389 super(MagicDirectory, self).__init__(parent_inode)
392 self.num_retries = num_retries
394 def __setattr__(self, name, value):
395 super(MagicDirectory, self).__setattr__(name, value)
396 # When we're assigned an inode, add a README.
397 if ((name == 'inode') and (self.inode is not None) and
398 (not self._entries)):
399 self._entries['README'] = self.inodes.add_entry(
400 StringFile(self.inode, self.README_TEXT, time.time()))
401 # If we're the root directory, add an identical by_id subdirectory.
402 if self.inode == llfuse.ROOT_INODE:
403 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
404 self.inode, self.inodes, self.api, self.num_retries))
406 def __contains__(self, k):
407 if k in self._entries:
410 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
414 e = self.inodes.add_entry(CollectionDirectory(
415 self.inode, self.inodes, self.api, self.num_retries, k))
421 except Exception as e:
422 _logger.debug('arv-mount exception keep %s', e)
425 def __getitem__(self, item):
427 return self._entries[item]
429 raise KeyError("No collection with id " + item)
432 class RecursiveInvalidateDirectory(Directory):
433 def invalidate(self):
434 if self.inode == llfuse.ROOT_INODE:
435 llfuse.lock.acquire()
437 super(RecursiveInvalidateDirectory, self).invalidate()
438 for a in self._entries:
439 self._entries[a].invalidate()
443 if self.inode == llfuse.ROOT_INODE:
444 llfuse.lock.release()
447 class TagsDirectory(RecursiveInvalidateDirectory):
448 '''A special directory that contains as subdirectories all tags visible to the user.'''
450 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
451 super(TagsDirectory, self).__init__(parent_inode)
454 self.num_retries = num_retries
456 self._poll_time = poll_time
459 with llfuse.lock_released:
460 tags = self.api.links().list(
461 filters=[['link_class', '=', 'tag']],
462 select=['name'], distinct=True
463 ).execute(num_retries=self.num_retries)
465 self.merge(tags['items'],
467 lambda a, i: a.tag == i['name'],
468 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
471 class TagDirectory(Directory):
472 '''A special directory that contains as subdirectories all collections visible
473 to the user that are tagged with a particular tag.
476 def __init__(self, parent_inode, inodes, api, num_retries, tag,
477 poll=False, poll_time=60):
478 super(TagDirectory, self).__init__(parent_inode)
481 self.num_retries = num_retries
484 self._poll_time = poll_time
487 with llfuse.lock_released:
488 taggedcollections = self.api.links().list(
489 filters=[['link_class', '=', 'tag'],
490 ['name', '=', self.tag],
491 ['head_uuid', 'is_a', 'arvados#collection']],
493 ).execute(num_retries=self.num_retries)
494 self.merge(taggedcollections['items'],
495 lambda i: i['head_uuid'],
496 lambda a, i: a.collection_locator == i['head_uuid'],
497 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
500 class ProjectDirectory(Directory):
501 '''A special directory that contains the contents of a project.'''
503 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
504 poll=False, poll_time=60):
505 super(ProjectDirectory, self).__init__(parent_inode)
508 self.num_retries = num_retries
509 self.project_object = project_object
510 self.project_object_file = None
511 self.uuid = project_object['uuid']
513 self._poll_time = poll_time
515 def createDirectory(self, i):
516 if collection_uuid_pattern.match(i['uuid']):
517 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
518 elif group_uuid_pattern.match(i['uuid']):
519 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
520 elif link_uuid_pattern.match(i['uuid']):
521 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
522 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
525 elif uuid_pattern.match(i['uuid']):
526 return ObjectFile(self.parent_inode, i)
531 if self.project_object_file == None:
532 self.project_object_file = ObjectFile(self.inode, self.project_object)
533 self.inodes.add_entry(self.project_object_file)
537 if i['name'] is None or len(i['name']) == 0:
539 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
540 # collection or subproject
542 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
545 elif 'kind' in i and i['kind'].startswith('arvados#'):
547 return "{}.{}".format(i['name'], i['kind'][8:])
552 if isinstance(a, CollectionDirectory):
553 return a.collection_locator == i['uuid']
554 elif isinstance(a, ProjectDirectory):
555 return a.uuid == i['uuid']
556 elif isinstance(a, ObjectFile):
557 return a.uuid == i['uuid'] and not a.stale()
560 with llfuse.lock_released:
561 if group_uuid_pattern.match(self.uuid):
562 self.project_object = self.api.groups().get(
563 uuid=self.uuid).execute(num_retries=self.num_retries)
564 elif user_uuid_pattern.match(self.uuid):
565 self.project_object = self.api.users().get(
566 uuid=self.uuid).execute(num_retries=self.num_retries)
568 contents = arvados.util.list_all(self.api.groups().contents,
569 self.num_retries, uuid=self.uuid)
570 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
571 contents += arvados.util.list_all(
572 self.api.links().list, self.num_retries,
573 filters=[['tail_uuid', '=', self.uuid],
574 ['link_class', '=', 'name']])
576 # end with llfuse.lock_released, re-acquire lock
581 self.createDirectory)
583 def __getitem__(self, item):
585 if item == '.arvados#project':
586 return self.project_object_file
588 return super(ProjectDirectory, self).__getitem__(item)
590 def __contains__(self, k):
591 if k == '.arvados#project':
594 return super(ProjectDirectory, self).__contains__(k)
597 class SharedDirectory(Directory):
598 '''A special directory that represents users or groups who have shared projects with me.'''
600 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
601 poll=False, poll_time=60):
602 super(SharedDirectory, self).__init__(parent_inode)
605 self.num_retries = num_retries
606 self.current_user = api.users().current().execute(num_retries=num_retries)
608 self._poll_time = poll_time
611 with llfuse.lock_released:
612 all_projects = arvados.util.list_all(
613 self.api.groups().list, self.num_retries,
614 filters=[['group_class','=','project']])
616 for ob in all_projects:
617 objects[ob['uuid']] = ob
621 for ob in all_projects:
622 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
624 root_owners[ob['owner_uuid']] = True
626 lusers = arvados.util.list_all(
627 self.api.users().list, self.num_retries,
628 filters=[['uuid','in', list(root_owners)]])
629 lgroups = arvados.util.list_all(
630 self.api.groups().list, self.num_retries,
631 filters=[['uuid','in', list(root_owners)]])
637 objects[l["uuid"]] = l
639 objects[l["uuid"]] = l
642 for r in root_owners:
646 contents[obr["name"]] = obr
647 if "first_name" in obr:
648 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
651 if r['owner_uuid'] not in objects:
652 contents[r['name']] = r
654 # end with llfuse.lock_released, re-acquire lock
657 self.merge(contents.items(),
659 lambda a, i: a.uuid == i[1]['uuid'],
660 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
665 class FileHandle(object):
666 '''Connects a numeric file handle to a File or Directory object that has
667 been opened by the client.'''
669 def __init__(self, fh, entry):
674 class Inodes(object):
675 '''Manage the set of inodes. This is the mapping from a numeric id
676 to a concrete File or Directory object'''
680 self._counter = itertools.count(llfuse.ROOT_INODE)
682 def __getitem__(self, item):
683 return self._entries[item]
685 def __setitem__(self, key, item):
686 self._entries[key] = item
689 return self._entries.iterkeys()
692 return self._entries.items()
694 def __contains__(self, k):
695 return k in self._entries
697 def add_entry(self, entry):
698 entry.inode = next(self._counter)
699 self._entries[entry.inode] = entry
702 def del_entry(self, entry):
703 llfuse.invalidate_inode(entry.inode)
704 del self._entries[entry.inode]
706 class Operations(llfuse.Operations):
707 '''This is the main interface with llfuse. The methods on this object are
708 called by llfuse threads to service FUSE events to query and read from
711 llfuse has its own global lock which is acquired before calling a request handler,
712 so request handlers do not run concurrently unless the lock is explicitly released
713 using "with llfuse.lock_released:"'''
715 def __init__(self, uid, gid, encoding="utf-8"):
716 super(Operations, self).__init__()
718 self.inodes = Inodes()
721 self.encoding = encoding
723 # dict of inode to filehandle
724 self._filehandles = {}
725 self._filehandles_counter = 1
727 # Other threads that need to wait until the fuse driver
728 # is fully initialized should wait() on this event object.
729 self.initlock = threading.Event()
732 # Allow threads that are waiting for the driver to be finished
733 # initializing to continue
736 def access(self, inode, mode, ctx):
739 def getattr(self, inode):
740 if inode not in self.inodes:
741 raise llfuse.FUSEError(errno.ENOENT)
743 e = self.inodes[inode]
745 entry = llfuse.EntryAttributes()
748 entry.entry_timeout = 300
749 entry.attr_timeout = 300
751 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
752 if isinstance(e, Directory):
753 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
754 elif isinstance(e, StreamReaderFile):
755 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
757 entry.st_mode |= stat.S_IFREG
760 entry.st_uid = self.uid
761 entry.st_gid = self.gid
764 entry.st_size = e.size()
766 entry.st_blksize = 512
767 entry.st_blocks = (e.size()/512)+1
768 entry.st_atime = int(e.atime())
769 entry.st_mtime = int(e.mtime())
770 entry.st_ctime = int(e.mtime())
774 def lookup(self, parent_inode, name):
775 name = unicode(name, self.encoding)
776 _logger.debug("arv-mount lookup: parent_inode %i name %s",
783 if parent_inode in self.inodes:
784 p = self.inodes[parent_inode]
786 inode = p.parent_inode
787 elif isinstance(p, Directory) and name in p:
788 inode = p[name].inode
791 return self.getattr(inode)
793 raise llfuse.FUSEError(errno.ENOENT)
795 def open(self, inode, flags):
796 if inode in self.inodes:
797 p = self.inodes[inode]
799 raise llfuse.FUSEError(errno.ENOENT)
801 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
802 raise llfuse.FUSEError(errno.EROFS)
804 if isinstance(p, Directory):
805 raise llfuse.FUSEError(errno.EISDIR)
807 fh = self._filehandles_counter
808 self._filehandles_counter += 1
809 self._filehandles[fh] = FileHandle(fh, p)
812 def read(self, fh, off, size):
813 _logger.debug("arv-mount read %i %i %i", fh, off, size)
814 if fh in self._filehandles:
815 handle = self._filehandles[fh]
817 raise llfuse.FUSEError(errno.EBADF)
820 handle.entry._atime = time.time()
823 with llfuse.lock_released:
824 return handle.entry.readfrom(off, size)
825 except arvados.errors.NotFoundError as e:
826 _logger.warning("Block not found: " + str(e))
827 raise llfuse.FUSEError(errno.EIO)
830 raise llfuse.FUSEError(errno.EIO)
832 def release(self, fh):
833 if fh in self._filehandles:
834 del self._filehandles[fh]
836 def opendir(self, inode):
837 _logger.debug("arv-mount opendir: inode %i", inode)
839 if inode in self.inodes:
840 p = self.inodes[inode]
842 raise llfuse.FUSEError(errno.ENOENT)
844 if not isinstance(p, Directory):
845 raise llfuse.FUSEError(errno.ENOTDIR)
847 fh = self._filehandles_counter
848 self._filehandles_counter += 1
849 if p.parent_inode in self.inodes:
850 parent = self.inodes[p.parent_inode]
852 raise llfuse.FUSEError(errno.EIO)
855 p._atime = time.time()
857 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
860 def readdir(self, fh, off):
861 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
863 if fh in self._filehandles:
864 handle = self._filehandles[fh]
866 raise llfuse.FUSEError(errno.EBADF)
868 _logger.debug("arv-mount handle.entry %s", handle.entry)
871 while e < len(handle.entry):
872 if handle.entry[e][1].inode in self.inodes:
874 yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
875 except UnicodeEncodeError:
879 def releasedir(self, fh):
880 del self._filehandles[fh]
883 st = llfuse.StatvfsData()
884 st.f_bsize = 64 * 1024
897 # The llfuse documentation recommends only overloading functions that
898 # are actually implemented, as the default implementation will raise ENOSYS.
899 # However, there is a bug in the llfuse default implementation of create()
900 # "create() takes exactly 5 positional arguments (6 given)" which will crash
902 # The workaround is to implement it with the proper number of parameters,
903 # and then everything works out.
904 def create(self, inode_parent, name, mode, flags, ctx):
905 raise llfuse.FUSEError(errno.EROFS)