2 # FUSE driver for Arvados Keep
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
26 _logger = logging.getLogger('arvados.arvados_fuse')
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
34 """Parse Arvados timestamp to unix time."""
38 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
39 except (TypeError, ValueError):
42 def sanitize_filename(dirty):
43 '''Replace disallowed filename characters with harmless "_".'''
53 return _disallowed_filename_characters.sub('_', dirty)
56 class FreshBase(object):
57 '''Base class for maintaining fresh/stale state to determine when to update.'''
61 self._last_update = time.time()
62 self._atime = time.time()
65 # Mark the value as stale
69 # Test if the entries dict is stale.
74 return (self._last_update + self._poll_time) < self._atime
79 self._last_update = time.time()
84 class File(FreshBase):
85 '''Base for file objects.'''
87 def __init__(self, parent_inode, _mtime=0):
88 super(File, self).__init__()
90 self.parent_inode = parent_inode
96 def readfrom(self, off, size):
103 class StreamReaderFile(File):
104 '''Wraps a StreamFileReader as a file.'''
106 def __init__(self, parent_inode, reader, _mtime):
107 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
111 return self.reader.size()
113 def readfrom(self, off, size):
114 return self.reader.readfrom(off, size)
120 class StringFile(File):
121 '''Wrap a simple string as a file'''
122 def __init__(self, parent_inode, contents, _mtime):
123 super(StringFile, self).__init__(parent_inode, _mtime)
124 self.contents = contents
127 return len(self.contents)
129 def readfrom(self, off, size):
130 return self.contents[off:(off+size)]
133 class ObjectFile(StringFile):
134 '''Wrap a dict as a serialized json object.'''
136 def __init__(self, parent_inode, obj):
137 super(ObjectFile, self).__init__(parent_inode, "", 0)
138 self.uuid = obj['uuid']
141 def update(self, obj):
142 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
143 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
146 class Directory(FreshBase):
147 '''Generic directory object, backed by a dict.
148 Consists of a set of entries with the key representing the filename
149 and the value referencing a File or Directory object.
152 def __init__(self, parent_inode):
153 super(Directory, self).__init__()
155 '''parent_inode is the integer inode number'''
157 if not isinstance(parent_inode, int):
158 raise Exception("parent_inode should be an int")
159 self.parent_inode = parent_inode
161 self._mtime = time.time()
163 # Overriden by subclasses to implement logic to update the entries dict
164 # when the directory is stale
168 # Only used when computing the size of the disk footprint of the directory
173 def checkupdate(self):
177 except apiclient.errors.HttpError as e:
180 def __getitem__(self, item):
182 return self._entries[item]
186 return self._entries.items()
190 return self._entries.iterkeys()
192 def __contains__(self, k):
194 return k in self._entries
196 def merge(self, items, fn, same, new_entry):
197 '''Helper method for updating the contents of the directory. Takes a list
198 describing the new contents of the directory, reuse entries that are
199 the same in both the old and new lists, create new entries, and delete
200 old entries missing from the new list.
202 items: iterable with new directory contents
204 fn: function to take an entry in 'items' and return the desired file or
205 directory name, or None if this entry should be skipped
207 same: function to compare an existing entry (a File or Directory
208 object) with an entry in the items list to determine whether to keep
211 new_entry: function to create a new directory entry (File or Directory
212 object) from an entry in the items list.
216 oldentries = self._entries
220 name = sanitize_filename(fn(i))
222 if name in oldentries and same(oldentries[name], i):
223 # move existing directory entry over
224 self._entries[name] = oldentries[name]
227 # create new directory entry
230 self._entries[name] = self.inodes.add_entry(ent)
233 # delete any other directory entries that were not in found in 'items'
235 llfuse.invalidate_entry(self.inode, str(i))
236 self.inodes.del_entry(oldentries[i])
240 self._mtime = time.time()
245 '''Delete all entries'''
246 oldentries = self._entries
249 if isinstance(n, Directory):
251 llfuse.invalidate_entry(self.inode, str(n))
252 self.inodes.del_entry(oldentries[n])
253 llfuse.invalidate_inode(self.inode)
260 class CollectionDirectory(Directory):
261 '''Represents the root of a directory tree holding a collection.'''
263 def __init__(self, parent_inode, inodes, api, num_retries, collection):
264 super(CollectionDirectory, self).__init__(parent_inode)
267 self.num_retries = num_retries
268 self.collection_object_file = None
269 self.collection_object = None
270 if isinstance(collection, dict):
271 self.collection_locator = collection['uuid']
272 self._mtime = convertTime(collection.get('modified_at'))
274 self.collection_locator = collection
278 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
280 # Used by arv-web.py to switch the contents of the CollectionDirectory
281 def change_collection(self, new_locator):
282 """Switch the contents of the CollectionDirectory. Must be called with llfuse.lock held."""
283 self.collection_locator = new_locator
284 self.collection_object = None
287 def new_collection(self, new_collection_object, coll_reader):
288 self.collection_object = new_collection_object
290 self._mtime = convertTime(self.collection_object.get('modified_at'))
292 if self.collection_object_file is not None:
293 self.collection_object_file.update(self.collection_object)
296 for s in coll_reader.all_streams():
298 for part in s.name().split('/'):
299 if part != '' and part != '.':
300 partname = sanitize_filename(part)
301 if partname not in cwd._entries:
302 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
303 cwd = cwd._entries[partname]
304 for k, v in s.files().items():
305 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
309 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
312 if self.collection_locator is None:
316 with llfuse.lock_released:
317 coll_reader = arvados.CollectionReader(
318 self.collection_locator, self.api, self.api.localkeep(),
319 num_retries=self.num_retries)
320 new_collection_object = coll_reader.api_response() or {}
321 # If the Collection only exists in Keep, there will be no API
322 # response. Fill in the fields we need.
323 if 'uuid' not in new_collection_object:
324 new_collection_object['uuid'] = self.collection_locator
325 if "portable_data_hash" not in new_collection_object:
326 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
327 if 'manifest_text' not in new_collection_object:
328 new_collection_object['manifest_text'] = coll_reader.manifest_text()
329 coll_reader.normalize()
330 # end with llfuse.lock_released, re-acquire lock
332 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
333 self.new_collection(new_collection_object, coll_reader)
337 except arvados.errors.NotFoundError:
338 _logger.exception("arv-mount %s: error", self.collection_locator)
339 except arvados.errors.ArgumentError as detail:
340 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
341 if self.collection_object is not None and "manifest_text" in self.collection_object:
342 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
344 _logger.exception("arv-mount %s: error", self.collection_locator)
345 if self.collection_object is not None and "manifest_text" in self.collection_object:
346 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
349 def __getitem__(self, item):
351 if item == '.arvados#collection':
352 if self.collection_object_file is None:
353 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
354 self.inodes.add_entry(self.collection_object_file)
355 return self.collection_object_file
357 return super(CollectionDirectory, self).__getitem__(item)
359 def __contains__(self, k):
360 if k == '.arvados#collection':
363 return super(CollectionDirectory, self).__contains__(k)
366 class MagicDirectory(Directory):
367 '''A special directory that logically contains the set of all extant keep
368 locators. When a file is referenced by lookup(), it is tested to see if it
369 is a valid keep locator to a manifest, and if so, loads the manifest
370 contents as a subdirectory of this directory with the locator as the
371 directory name. Since querying a list of all extant keep locators is
372 impractical, only collections that have already been accessed are visible
377 This directory provides access to Arvados collections as subdirectories listed
378 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
379 the form '1234567890abcdefghijklmnopqrstuv+123').
381 Note that this directory will appear empty until you attempt to access a
382 specific collection subdirectory (such as trying to 'cd' into it), at which
383 point the collection will actually be looked up on the server and the directory
384 will appear if it exists.
387 def __init__(self, parent_inode, inodes, api, num_retries):
388 super(MagicDirectory, self).__init__(parent_inode)
391 self.num_retries = num_retries
393 def __setattr__(self, name, value):
394 super(MagicDirectory, self).__setattr__(name, value)
395 # When we're assigned an inode, add a README.
396 if ((name == 'inode') and (self.inode is not None) and
397 (not self._entries)):
398 self._entries['README'] = self.inodes.add_entry(
399 StringFile(self.inode, self.README_TEXT, time.time()))
400 # If we're the root directory, add an identical by_id subdirectory.
401 if self.inode == llfuse.ROOT_INODE:
402 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
403 self.inode, self.inodes, self.api, self.num_retries))
405 def __contains__(self, k):
406 if k in self._entries:
409 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
413 e = self.inodes.add_entry(CollectionDirectory(
414 self.inode, self.inodes, self.api, self.num_retries, k))
420 except Exception as e:
421 _logger.debug('arv-mount exception keep %s', e)
424 def __getitem__(self, item):
426 return self._entries[item]
428 raise KeyError("No collection with id " + item)
431 class RecursiveInvalidateDirectory(Directory):
432 def invalidate(self):
433 if self.inode == llfuse.ROOT_INODE:
434 llfuse.lock.acquire()
436 super(RecursiveInvalidateDirectory, self).invalidate()
437 for a in self._entries:
438 self._entries[a].invalidate()
442 if self.inode == llfuse.ROOT_INODE:
443 llfuse.lock.release()
446 class TagsDirectory(RecursiveInvalidateDirectory):
447 '''A special directory that contains as subdirectories all tags visible to the user.'''
449 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
450 super(TagsDirectory, self).__init__(parent_inode)
453 self.num_retries = num_retries
455 self._poll_time = poll_time
458 with llfuse.lock_released:
459 tags = self.api.links().list(
460 filters=[['link_class', '=', 'tag']],
461 select=['name'], distinct=True
462 ).execute(num_retries=self.num_retries)
464 self.merge(tags['items'],
466 lambda a, i: a.tag == i['name'],
467 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
470 class TagDirectory(Directory):
471 '''A special directory that contains as subdirectories all collections visible
472 to the user that are tagged with a particular tag.
475 def __init__(self, parent_inode, inodes, api, num_retries, tag,
476 poll=False, poll_time=60):
477 super(TagDirectory, self).__init__(parent_inode)
480 self.num_retries = num_retries
483 self._poll_time = poll_time
486 with llfuse.lock_released:
487 taggedcollections = self.api.links().list(
488 filters=[['link_class', '=', 'tag'],
489 ['name', '=', self.tag],
490 ['head_uuid', 'is_a', 'arvados#collection']],
492 ).execute(num_retries=self.num_retries)
493 self.merge(taggedcollections['items'],
494 lambda i: i['head_uuid'],
495 lambda a, i: a.collection_locator == i['head_uuid'],
496 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
499 class ProjectDirectory(Directory):
500 '''A special directory that contains the contents of a project.'''
502 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
503 poll=False, poll_time=60):
504 super(ProjectDirectory, self).__init__(parent_inode)
507 self.num_retries = num_retries
508 self.project_object = project_object
509 self.project_object_file = None
510 self.uuid = project_object['uuid']
512 self._poll_time = poll_time
514 def createDirectory(self, i):
515 if collection_uuid_pattern.match(i['uuid']):
516 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
517 elif group_uuid_pattern.match(i['uuid']):
518 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
519 elif link_uuid_pattern.match(i['uuid']):
520 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
521 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
524 elif uuid_pattern.match(i['uuid']):
525 return ObjectFile(self.parent_inode, i)
530 if self.project_object_file == None:
531 self.project_object_file = ObjectFile(self.inode, self.project_object)
532 self.inodes.add_entry(self.project_object_file)
536 if i['name'] is None or len(i['name']) == 0:
538 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
539 # collection or subproject
541 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
544 elif 'kind' in i and i['kind'].startswith('arvados#'):
546 return "{}.{}".format(i['name'], i['kind'][8:])
551 if isinstance(a, CollectionDirectory):
552 return a.collection_locator == i['uuid']
553 elif isinstance(a, ProjectDirectory):
554 return a.uuid == i['uuid']
555 elif isinstance(a, ObjectFile):
556 return a.uuid == i['uuid'] and not a.stale()
559 with llfuse.lock_released:
560 if group_uuid_pattern.match(self.uuid):
561 self.project_object = self.api.groups().get(
562 uuid=self.uuid).execute(num_retries=self.num_retries)
563 elif user_uuid_pattern.match(self.uuid):
564 self.project_object = self.api.users().get(
565 uuid=self.uuid).execute(num_retries=self.num_retries)
567 contents = arvados.util.list_all(self.api.groups().contents,
568 self.num_retries, uuid=self.uuid)
570 # end with llfuse.lock_released, re-acquire lock
575 self.createDirectory)
577 def __getitem__(self, item):
579 if item == '.arvados#project':
580 return self.project_object_file
582 return super(ProjectDirectory, self).__getitem__(item)
584 def __contains__(self, k):
585 if k == '.arvados#project':
588 return super(ProjectDirectory, self).__contains__(k)
591 class SharedDirectory(Directory):
592 '''A special directory that represents users or groups who have shared projects with me.'''
594 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
595 poll=False, poll_time=60):
596 super(SharedDirectory, self).__init__(parent_inode)
599 self.num_retries = num_retries
600 self.current_user = api.users().current().execute(num_retries=num_retries)
602 self._poll_time = poll_time
605 with llfuse.lock_released:
606 all_projects = arvados.util.list_all(
607 self.api.groups().list, self.num_retries,
608 filters=[['group_class','=','project']])
610 for ob in all_projects:
611 objects[ob['uuid']] = ob
615 for ob in all_projects:
616 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
618 root_owners[ob['owner_uuid']] = True
620 lusers = arvados.util.list_all(
621 self.api.users().list, self.num_retries,
622 filters=[['uuid','in', list(root_owners)]])
623 lgroups = arvados.util.list_all(
624 self.api.groups().list, self.num_retries,
625 filters=[['uuid','in', list(root_owners)]])
631 objects[l["uuid"]] = l
633 objects[l["uuid"]] = l
636 for r in root_owners:
640 contents[obr["name"]] = obr
641 if "first_name" in obr:
642 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
645 if r['owner_uuid'] not in objects:
646 contents[r['name']] = r
648 # end with llfuse.lock_released, re-acquire lock
651 self.merge(contents.items(),
653 lambda a, i: a.uuid == i[1]['uuid'],
654 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
659 class FileHandle(object):
660 '''Connects a numeric file handle to a File or Directory object that has
661 been opened by the client.'''
663 def __init__(self, fh, entry):
668 class Inodes(object):
669 '''Manage the set of inodes. This is the mapping from a numeric id
670 to a concrete File or Directory object'''
674 self._counter = itertools.count(llfuse.ROOT_INODE)
676 def __getitem__(self, item):
677 return self._entries[item]
679 def __setitem__(self, key, item):
680 self._entries[key] = item
683 return self._entries.iterkeys()
686 return self._entries.items()
688 def __contains__(self, k):
689 return k in self._entries
691 def add_entry(self, entry):
692 entry.inode = next(self._counter)
693 self._entries[entry.inode] = entry
696 def del_entry(self, entry):
697 llfuse.invalidate_inode(entry.inode)
698 del self._entries[entry.inode]
700 class Operations(llfuse.Operations):
701 '''This is the main interface with llfuse. The methods on this object are
702 called by llfuse threads to service FUSE events to query and read from
705 llfuse has its own global lock which is acquired before calling a request handler,
706 so request handlers do not run concurrently unless the lock is explicitly released
707 using "with llfuse.lock_released:"'''
709 def __init__(self, uid, gid, encoding="utf-8"):
710 super(Operations, self).__init__()
712 self.inodes = Inodes()
715 self.encoding = encoding
717 # dict of inode to filehandle
718 self._filehandles = {}
719 self._filehandles_counter = 1
721 # Other threads that need to wait until the fuse driver
722 # is fully initialized should wait() on this event object.
723 self.initlock = threading.Event()
726 # Allow threads that are waiting for the driver to be finished
727 # initializing to continue
730 def access(self, inode, mode, ctx):
733 def getattr(self, inode):
734 if inode not in self.inodes:
735 raise llfuse.FUSEError(errno.ENOENT)
737 e = self.inodes[inode]
739 entry = llfuse.EntryAttributes()
742 entry.entry_timeout = 300
743 entry.attr_timeout = 300
745 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
746 if isinstance(e, Directory):
747 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
748 elif isinstance(e, StreamReaderFile):
749 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
751 entry.st_mode |= stat.S_IFREG
754 entry.st_uid = self.uid
755 entry.st_gid = self.gid
758 entry.st_size = e.size()
760 entry.st_blksize = 512
761 entry.st_blocks = (e.size()/512)+1
762 entry.st_atime = int(e.atime())
763 entry.st_mtime = int(e.mtime())
764 entry.st_ctime = int(e.mtime())
768 def lookup(self, parent_inode, name):
769 name = unicode(name, self.encoding)
770 _logger.debug("arv-mount lookup: parent_inode %i name %s",
777 if parent_inode in self.inodes:
778 p = self.inodes[parent_inode]
780 inode = p.parent_inode
781 elif isinstance(p, Directory) and name in p:
782 inode = p[name].inode
785 return self.getattr(inode)
787 raise llfuse.FUSEError(errno.ENOENT)
789 def open(self, inode, flags):
790 if inode in self.inodes:
791 p = self.inodes[inode]
793 raise llfuse.FUSEError(errno.ENOENT)
795 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
796 raise llfuse.FUSEError(errno.EROFS)
798 if isinstance(p, Directory):
799 raise llfuse.FUSEError(errno.EISDIR)
801 fh = self._filehandles_counter
802 self._filehandles_counter += 1
803 self._filehandles[fh] = FileHandle(fh, p)
806 def read(self, fh, off, size):
807 _logger.debug("arv-mount read %i %i %i", fh, off, size)
808 if fh in self._filehandles:
809 handle = self._filehandles[fh]
811 raise llfuse.FUSEError(errno.EBADF)
814 handle.entry._atime = time.time()
817 with llfuse.lock_released:
818 return handle.entry.readfrom(off, size)
819 except arvados.errors.NotFoundError as e:
820 _logger.warning("Block not found: " + str(e))
821 raise llfuse.FUSEError(errno.EIO)
824 raise llfuse.FUSEError(errno.EIO)
826 def release(self, fh):
827 if fh in self._filehandles:
828 del self._filehandles[fh]
830 def opendir(self, inode):
831 _logger.debug("arv-mount opendir: inode %i", inode)
833 if inode in self.inodes:
834 p = self.inodes[inode]
836 raise llfuse.FUSEError(errno.ENOENT)
838 if not isinstance(p, Directory):
839 raise llfuse.FUSEError(errno.ENOTDIR)
841 fh = self._filehandles_counter
842 self._filehandles_counter += 1
843 if p.parent_inode in self.inodes:
844 parent = self.inodes[p.parent_inode]
846 raise llfuse.FUSEError(errno.EIO)
849 p._atime = time.time()
851 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
854 def readdir(self, fh, off):
855 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
857 if fh in self._filehandles:
858 handle = self._filehandles[fh]
860 raise llfuse.FUSEError(errno.EBADF)
862 _logger.debug("arv-mount handle.entry %s", handle.entry)
865 while e < len(handle.entry):
866 if handle.entry[e][1].inode in self.inodes:
868 yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
869 except UnicodeEncodeError:
873 def releasedir(self, fh):
874 del self._filehandles[fh]
877 st = llfuse.StatvfsData()
878 st.f_bsize = 64 * 1024
891 # The llfuse documentation recommends only overloading functions that
892 # are actually implemented, as the default implementation will raise ENOSYS.
893 # However, there is a bug in the llfuse default implementation of create()
894 # "create() takes exactly 5 positional arguments (6 given)" which will crash
896 # The workaround is to implement it with the proper number of parameters,
897 # and then everything works out.
898 def create(self, inode_parent, name, mode, flags, ctx):
899 raise llfuse.FUSEError(errno.EROFS)