2 # FUSE driver for Arvados Keep
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
26 _logger = logging.getLogger('arvados.arvados_fuse')
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
34 """Parse Arvados timestamp to unix time."""
38 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
39 except (TypeError, ValueError):
42 def sanitize_filename(dirty):
43 '''Replace disallowed filename characters with harmless "_".'''
53 return _disallowed_filename_characters.sub('_', dirty)
56 class FreshBase(object):
57 '''Base class for maintaining fresh/stale state to determine when to update.'''
61 self._last_update = time.time()
62 self._atime = time.time()
65 # Mark the value as stale
69 # Test if the entries dict is stale.
74 return (self._last_update + self._poll_time) < self._atime
79 self._last_update = time.time()
84 class File(FreshBase):
85 '''Base for file objects.'''
87 def __init__(self, parent_inode, _mtime=0):
88 super(File, self).__init__()
90 self.parent_inode = parent_inode
96 def readfrom(self, off, size):
103 class StreamReaderFile(File):
104 '''Wraps a StreamFileReader as a file.'''
106 def __init__(self, parent_inode, reader, _mtime):
107 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
111 return self.reader.size()
113 def readfrom(self, off, size):
114 return self.reader.readfrom(off, size)
120 class StringFile(File):
121 '''Wrap a simple string as a file'''
122 def __init__(self, parent_inode, contents, _mtime):
123 super(StringFile, self).__init__(parent_inode, _mtime)
124 self.contents = contents
127 return len(self.contents)
129 def readfrom(self, off, size):
130 return self.contents[off:(off+size)]
133 class ObjectFile(StringFile):
134 '''Wrap a dict as a serialized json object.'''
136 def __init__(self, parent_inode, obj):
137 super(ObjectFile, self).__init__(parent_inode, "", 0)
138 self.uuid = obj['uuid']
141 def update(self, obj):
142 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
143 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
146 class Directory(FreshBase):
147 '''Generic directory object, backed by a dict.
148 Consists of a set of entries with the key representing the filename
149 and the value referencing a File or Directory object.
152 def __init__(self, parent_inode):
153 super(Directory, self).__init__()
155 '''parent_inode is the integer inode number'''
157 if not isinstance(parent_inode, int):
158 raise Exception("parent_inode should be an int")
159 self.parent_inode = parent_inode
161 self._mtime = time.time()
163 # Overriden by subclasses to implement logic to update the entries dict
164 # when the directory is stale
168 # Only used when computing the size of the disk footprint of the directory
173 def checkupdate(self):
177 except apiclient.errors.HttpError as e:
180 def __getitem__(self, item):
182 return self._entries[item]
186 return self._entries.items()
190 return self._entries.iterkeys()
192 def __contains__(self, k):
194 return k in self._entries
196 def merge(self, items, fn, same, new_entry):
197 '''Helper method for updating the contents of the directory. Takes a list
198 describing the new contents of the directory, reuse entries that are
199 the same in both the old and new lists, create new entries, and delete
200 old entries missing from the new list.
202 items: iterable with new directory contents
204 fn: function to take an entry in 'items' and return the desired file or
205 directory name, or None if this entry should be skipped
207 same: function to compare an existing entry (a File or Directory
208 object) with an entry in the items list to determine whether to keep
211 new_entry: function to create a new directory entry (File or Directory
212 object) from an entry in the items list.
216 oldentries = self._entries
220 name = sanitize_filename(fn(i))
222 if name in oldentries and same(oldentries[name], i):
223 # move existing directory entry over
224 self._entries[name] = oldentries[name]
227 # create new directory entry
230 self._entries[name] = self.inodes.add_entry(ent)
233 # delete any other directory entries that were not in found in 'items'
235 llfuse.invalidate_entry(self.inode, str(i))
236 self.inodes.del_entry(oldentries[i])
240 self._mtime = time.time()
245 '''Delete all entries'''
246 oldentries = self._entries
249 if isinstance(n, Directory):
251 llfuse.invalidate_entry(self.inode, str(n))
252 self.inodes.del_entry(oldentries[n])
253 llfuse.invalidate_inode(self.inode)
260 class CollectionDirectory(Directory):
261 '''Represents the root of a directory tree holding a collection.'''
263 def __init__(self, parent_inode, inodes, api, num_retries, collection):
264 super(CollectionDirectory, self).__init__(parent_inode)
267 self.num_retries = num_retries
268 self.collection_object_file = None
269 self.collection_object = None
270 if isinstance(collection, dict):
271 self.collection_locator = collection['uuid']
272 self._mtime = convertTime(collection.get('modified_at'))
274 self.collection_locator = collection
278 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
280 # Used by arv-web.py to switch the contents of the CollectionDirectory
281 def change_collection(self, new_locator):
282 """Switch the contents of the CollectionDirectory. Must be called with llfuse.lock held."""
283 self.collection_locator = new_locator
284 self.collection_object = None
287 def new_collection(self, new_collection_object, coll_reader):
288 self.collection_object = new_collection_object
290 self._mtime = convertTime(self.collection_object.get('modified_at'))
292 if self.collection_object_file is not None:
293 self.collection_object_file.update(self.collection_object)
296 for s in coll_reader.all_streams():
298 for part in s.name().split('/'):
299 if part != '' and part != '.':
300 partname = sanitize_filename(part)
301 if partname not in cwd._entries:
302 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
303 cwd = cwd._entries[partname]
304 for k, v in s.files().items():
305 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
309 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
312 if self.collection_locator is None:
316 with llfuse.lock_released:
317 coll_reader = arvados.CollectionReader(
318 self.collection_locator, self.api, self.api.keep,
319 num_retries=self.num_retries)
320 new_collection_object = coll_reader.api_response() or {}
321 # If the Collection only exists in Keep, there will be no API
322 # response. Fill in the fields we need.
323 if 'uuid' not in new_collection_object:
324 new_collection_object['uuid'] = self.collection_locator
325 if "portable_data_hash" not in new_collection_object:
326 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
327 if 'manifest_text' not in new_collection_object:
328 new_collection_object['manifest_text'] = coll_reader.manifest_text()
329 coll_reader.normalize()
330 # end with llfuse.lock_released, re-acquire lock
332 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
333 self.new_collection(new_collection_object, coll_reader)
337 except arvados.errors.NotFoundError:
338 _logger.exception("arv-mount %s: error", self.collection_locator)
339 except arvados.errors.ArgumentError as detail:
340 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
341 if self.collection_object is not None and "manifest_text" in self.collection_object:
342 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
344 _logger.exception("arv-mount %s: error", self.collection_locator)
345 if self.collection_object is not None and "manifest_text" in self.collection_object:
346 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
349 def __getitem__(self, item):
351 if item == '.arvados#collection':
352 if self.collection_object_file is None:
353 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
354 self.inodes.add_entry(self.collection_object_file)
355 return self.collection_object_file
357 return super(CollectionDirectory, self).__getitem__(item)
359 def __contains__(self, k):
360 if k == '.arvados#collection':
363 return super(CollectionDirectory, self).__contains__(k)
366 class MagicDirectory(Directory):
367 '''A special directory that logically contains the set of all extant keep
368 locators. When a file is referenced by lookup(), it is tested to see if it
369 is a valid keep locator to a manifest, and if so, loads the manifest
370 contents as a subdirectory of this directory with the locator as the
371 directory name. Since querying a list of all extant keep locators is
372 impractical, only collections that have already been accessed are visible
377 This directory provides access to Arvados collections as subdirectories listed
378 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
379 the form '1234567890abcdefghijklmnopqrstuv+123').
381 Note that this directory will appear empty until you attempt to access a
382 specific collection subdirectory (such as trying to 'cd' into it), at which
383 point the collection will actually be looked up on the server and the directory
384 will appear if it exists.
387 def __init__(self, parent_inode, inodes, api, num_retries):
388 super(MagicDirectory, self).__init__(parent_inode)
391 self.num_retries = num_retries
393 def __setattr__(self, name, value):
394 super(MagicDirectory, self).__setattr__(name, value)
395 # When we're assigned an inode, add a README.
396 if ((name == 'inode') and (self.inode is not None) and
397 (not self._entries)):
398 self._entries['README'] = self.inodes.add_entry(
399 StringFile(self.inode, self.README_TEXT, time.time()))
400 # If we're the root directory, add an identical by_id subdirectory.
401 if self.inode == llfuse.ROOT_INODE:
402 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
403 self.inode, self.inodes, self.api, self.num_retries))
405 def __contains__(self, k):
406 if k in self._entries:
409 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
413 e = self.inodes.add_entry(CollectionDirectory(
414 self.inode, self.inodes, self.api, self.num_retries, k))
420 except Exception as e:
421 _logger.debug('arv-mount exception keep %s', e)
424 def __getitem__(self, item):
426 return self._entries[item]
428 raise KeyError("No collection with id " + item)
431 class RecursiveInvalidateDirectory(Directory):
432 def invalidate(self):
433 if self.inode == llfuse.ROOT_INODE:
434 llfuse.lock.acquire()
436 super(RecursiveInvalidateDirectory, self).invalidate()
437 for a in self._entries:
438 self._entries[a].invalidate()
442 if self.inode == llfuse.ROOT_INODE:
443 llfuse.lock.release()
446 class TagsDirectory(RecursiveInvalidateDirectory):
447 '''A special directory that contains as subdirectories all tags visible to the user.'''
449 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
450 super(TagsDirectory, self).__init__(parent_inode)
453 self.num_retries = num_retries
455 self._poll_time = poll_time
458 with llfuse.lock_released:
459 tags = self.api.links().list(
460 filters=[['link_class', '=', 'tag']],
461 select=['name'], distinct=True
462 ).execute(num_retries=self.num_retries)
464 self.merge(tags['items'],
466 lambda a, i: a.tag == i['name'],
467 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
470 class TagDirectory(Directory):
471 '''A special directory that contains as subdirectories all collections visible
472 to the user that are tagged with a particular tag.
475 def __init__(self, parent_inode, inodes, api, num_retries, tag,
476 poll=False, poll_time=60):
477 super(TagDirectory, self).__init__(parent_inode)
480 self.num_retries = num_retries
483 self._poll_time = poll_time
486 with llfuse.lock_released:
487 taggedcollections = self.api.links().list(
488 filters=[['link_class', '=', 'tag'],
489 ['name', '=', self.tag],
490 ['head_uuid', 'is_a', 'arvados#collection']],
492 ).execute(num_retries=self.num_retries)
493 self.merge(taggedcollections['items'],
494 lambda i: i['head_uuid'],
495 lambda a, i: a.collection_locator == i['head_uuid'],
496 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
499 class ProjectDirectory(Directory):
500 '''A special directory that contains the contents of a project.'''
502 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
503 poll=False, poll_time=60):
504 super(ProjectDirectory, self).__init__(parent_inode)
507 self.num_retries = num_retries
508 self.project_object = project_object
509 self.project_object_file = None
510 self.uuid = project_object['uuid']
512 self._poll_time = poll_time
514 def createDirectory(self, i):
515 if collection_uuid_pattern.match(i['uuid']):
516 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
517 elif group_uuid_pattern.match(i['uuid']):
518 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
519 elif link_uuid_pattern.match(i['uuid']):
520 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
521 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
524 elif uuid_pattern.match(i['uuid']):
525 return ObjectFile(self.parent_inode, i)
530 if self.project_object_file == None:
531 self.project_object_file = ObjectFile(self.inode, self.project_object)
532 self.inodes.add_entry(self.project_object_file)
536 if i['name'] is None or len(i['name']) == 0:
538 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
539 # collection or subproject
541 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
544 elif 'kind' in i and i['kind'].startswith('arvados#'):
546 return "{}.{}".format(i['name'], i['kind'][8:])
551 if isinstance(a, CollectionDirectory):
552 return a.collection_locator == i['uuid']
553 elif isinstance(a, ProjectDirectory):
554 return a.uuid == i['uuid']
555 elif isinstance(a, ObjectFile):
556 return a.uuid == i['uuid'] and not a.stale()
559 with llfuse.lock_released:
560 if group_uuid_pattern.match(self.uuid):
561 self.project_object = self.api.groups().get(
562 uuid=self.uuid).execute(num_retries=self.num_retries)
563 elif user_uuid_pattern.match(self.uuid):
564 self.project_object = self.api.users().get(
565 uuid=self.uuid).execute(num_retries=self.num_retries)
567 contents = arvados.util.list_all(self.api.groups().contents,
568 self.num_retries, uuid=self.uuid)
569 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
570 contents += arvados.util.list_all(
571 self.api.links().list, self.num_retries,
572 filters=[['tail_uuid', '=', self.uuid],
573 ['link_class', '=', 'name']])
575 # end with llfuse.lock_released, re-acquire lock
580 self.createDirectory)
582 def __getitem__(self, item):
584 if item == '.arvados#project':
585 return self.project_object_file
587 return super(ProjectDirectory, self).__getitem__(item)
589 def __contains__(self, k):
590 if k == '.arvados#project':
593 return super(ProjectDirectory, self).__contains__(k)
596 class SharedDirectory(Directory):
597 '''A special directory that represents users or groups who have shared projects with me.'''
599 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
600 poll=False, poll_time=60):
601 super(SharedDirectory, self).__init__(parent_inode)
604 self.num_retries = num_retries
605 self.current_user = api.users().current().execute(num_retries=num_retries)
607 self._poll_time = poll_time
610 with llfuse.lock_released:
611 all_projects = arvados.util.list_all(
612 self.api.groups().list, self.num_retries,
613 filters=[['group_class','=','project']])
615 for ob in all_projects:
616 objects[ob['uuid']] = ob
620 for ob in all_projects:
621 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
623 root_owners[ob['owner_uuid']] = True
625 lusers = arvados.util.list_all(
626 self.api.users().list, self.num_retries,
627 filters=[['uuid','in', list(root_owners)]])
628 lgroups = arvados.util.list_all(
629 self.api.groups().list, self.num_retries,
630 filters=[['uuid','in', list(root_owners)]])
636 objects[l["uuid"]] = l
638 objects[l["uuid"]] = l
641 for r in root_owners:
645 contents[obr["name"]] = obr
646 if "first_name" in obr:
647 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
650 if r['owner_uuid'] not in objects:
651 contents[r['name']] = r
653 # end with llfuse.lock_released, re-acquire lock
656 self.merge(contents.items(),
658 lambda a, i: a.uuid == i[1]['uuid'],
659 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
664 class FileHandle(object):
665 '''Connects a numeric file handle to a File or Directory object that has
666 been opened by the client.'''
668 def __init__(self, fh, entry):
673 class Inodes(object):
674 '''Manage the set of inodes. This is the mapping from a numeric id
675 to a concrete File or Directory object'''
679 self._counter = itertools.count(llfuse.ROOT_INODE)
681 def __getitem__(self, item):
682 return self._entries[item]
684 def __setitem__(self, key, item):
685 self._entries[key] = item
688 return self._entries.iterkeys()
691 return self._entries.items()
693 def __contains__(self, k):
694 return k in self._entries
696 def add_entry(self, entry):
697 entry.inode = next(self._counter)
698 self._entries[entry.inode] = entry
701 def del_entry(self, entry):
702 llfuse.invalidate_inode(entry.inode)
703 del self._entries[entry.inode]
705 class Operations(llfuse.Operations):
706 '''This is the main interface with llfuse. The methods on this object are
707 called by llfuse threads to service FUSE events to query and read from
710 llfuse has its own global lock which is acquired before calling a request handler,
711 so request handlers do not run concurrently unless the lock is explicitly released
712 using "with llfuse.lock_released:"'''
714 def __init__(self, uid, gid, encoding="utf-8"):
715 super(Operations, self).__init__()
717 self.inodes = Inodes()
720 self.encoding = encoding
722 # dict of inode to filehandle
723 self._filehandles = {}
724 self._filehandles_counter = 1
726 # Other threads that need to wait until the fuse driver
727 # is fully initialized should wait() on this event object.
728 self.initlock = threading.Event()
731 # Allow threads that are waiting for the driver to be finished
732 # initializing to continue
735 def access(self, inode, mode, ctx):
738 def getattr(self, inode):
739 if inode not in self.inodes:
740 raise llfuse.FUSEError(errno.ENOENT)
742 e = self.inodes[inode]
744 entry = llfuse.EntryAttributes()
747 entry.entry_timeout = 300
748 entry.attr_timeout = 300
750 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
751 if isinstance(e, Directory):
752 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
753 elif isinstance(e, StreamReaderFile):
754 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
756 entry.st_mode |= stat.S_IFREG
759 entry.st_uid = self.uid
760 entry.st_gid = self.gid
763 entry.st_size = e.size()
765 entry.st_blksize = 512
766 entry.st_blocks = (e.size()/512)+1
767 entry.st_atime = int(e.atime())
768 entry.st_mtime = int(e.mtime())
769 entry.st_ctime = int(e.mtime())
773 def lookup(self, parent_inode, name):
774 name = unicode(name, self.encoding)
775 _logger.debug("arv-mount lookup: parent_inode %i name %s",
782 if parent_inode in self.inodes:
783 p = self.inodes[parent_inode]
785 inode = p.parent_inode
786 elif isinstance(p, Directory) and name in p:
787 inode = p[name].inode
790 return self.getattr(inode)
792 raise llfuse.FUSEError(errno.ENOENT)
794 def open(self, inode, flags):
795 if inode in self.inodes:
796 p = self.inodes[inode]
798 raise llfuse.FUSEError(errno.ENOENT)
800 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
801 raise llfuse.FUSEError(errno.EROFS)
803 if isinstance(p, Directory):
804 raise llfuse.FUSEError(errno.EISDIR)
806 fh = self._filehandles_counter
807 self._filehandles_counter += 1
808 self._filehandles[fh] = FileHandle(fh, p)
811 def read(self, fh, off, size):
812 _logger.debug("arv-mount read %i %i %i", fh, off, size)
813 if fh in self._filehandles:
814 handle = self._filehandles[fh]
816 raise llfuse.FUSEError(errno.EBADF)
819 handle.entry._atime = time.time()
822 with llfuse.lock_released:
823 return handle.entry.readfrom(off, size)
824 except arvados.errors.NotFoundError as e:
825 _logger.warning("Block not found: " + str(e))
826 raise llfuse.FUSEError(errno.EIO)
829 raise llfuse.FUSEError(errno.EIO)
831 def release(self, fh):
832 if fh in self._filehandles:
833 del self._filehandles[fh]
835 def opendir(self, inode):
836 _logger.debug("arv-mount opendir: inode %i", inode)
838 if inode in self.inodes:
839 p = self.inodes[inode]
841 raise llfuse.FUSEError(errno.ENOENT)
843 if not isinstance(p, Directory):
844 raise llfuse.FUSEError(errno.ENOTDIR)
846 fh = self._filehandles_counter
847 self._filehandles_counter += 1
848 if p.parent_inode in self.inodes:
849 parent = self.inodes[p.parent_inode]
851 raise llfuse.FUSEError(errno.EIO)
854 p._atime = time.time()
856 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
859 def readdir(self, fh, off):
860 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
862 if fh in self._filehandles:
863 handle = self._filehandles[fh]
865 raise llfuse.FUSEError(errno.EBADF)
867 _logger.debug("arv-mount handle.entry %s", handle.entry)
870 while e < len(handle.entry):
871 if handle.entry[e][1].inode in self.inodes:
873 yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
874 except UnicodeEncodeError:
878 def releasedir(self, fh):
879 del self._filehandles[fh]
882 st = llfuse.StatvfsData()
883 st.f_bsize = 64 * 1024
896 # The llfuse documentation recommends only overloading functions that
897 # are actually implemented, as the default implementation will raise ENOSYS.
898 # However, there is a bug in the llfuse default implementation of create()
899 # "create() takes exactly 5 positional arguments (6 given)" which will crash
901 # The workaround is to implement it with the proper number of parameters,
902 # and then everything works out.
903 def create(self, inode_parent, name, mode, flags, ctx):
904 raise llfuse.FUSEError(errno.EROFS)