2 # FUSE driver for Arvados Keep
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
26 _logger = logging.getLogger('arvados.arvados_fuse')
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
34 '''Parse Arvados timestamp to unix time.'''
36 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
37 except (TypeError, ValueError):
40 def sanitize_filename(dirty):
41 '''Replace disallowed filename characters with harmless "_".'''
51 return _disallowed_filename_characters.sub('_', dirty)
54 class FreshBase(object):
55 '''Base class for maintaining fresh/stale state to determine when to update.'''
59 self._last_update = time.time()
60 self._atime = time.time()
63 # Mark the value as stale
67 # Test if the entries dict is stale.
72 return (self._last_update + self._poll_time) < self._atime
77 self._last_update = time.time()
82 class File(FreshBase):
83 '''Base for file objects.'''
85 def __init__(self, parent_inode, _mtime=0):
86 super(File, self).__init__()
88 self.parent_inode = parent_inode
94 def readfrom(self, off, size):
101 class StreamReaderFile(File):
102 '''Wraps a StreamFileReader as a file.'''
104 def __init__(self, parent_inode, reader, _mtime):
105 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
109 return self.reader.size()
111 def readfrom(self, off, size):
112 return self.reader.readfrom(off, size)
118 class StringFile(File):
119 '''Wrap a simple string as a file'''
120 def __init__(self, parent_inode, contents, _mtime):
121 super(StringFile, self).__init__(parent_inode, _mtime)
122 self.contents = contents
125 return len(self.contents)
127 def readfrom(self, off, size):
128 return self.contents[off:(off+size)]
131 class ObjectFile(StringFile):
132 '''Wrap a dict as a serialized json object.'''
134 def __init__(self, parent_inode, obj):
135 super(ObjectFile, self).__init__(parent_inode, "", 0)
136 self.uuid = obj['uuid']
139 def update(self, obj):
140 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
141 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
144 class Directory(FreshBase):
145 '''Generic directory object, backed by a dict.
146 Consists of a set of entries with the key representing the filename
147 and the value referencing a File or Directory object.
150 def __init__(self, parent_inode):
151 super(Directory, self).__init__()
153 '''parent_inode is the integer inode number'''
155 if not isinstance(parent_inode, int):
156 raise Exception("parent_inode should be an int")
157 self.parent_inode = parent_inode
159 self._mtime = time.time()
161 # Overriden by subclasses to implement logic to update the entries dict
162 # when the directory is stale
166 # Only used when computing the size of the disk footprint of the directory
171 def checkupdate(self):
175 except apiclient.errors.HttpError as e:
178 def __getitem__(self, item):
180 return self._entries[item]
184 return self._entries.items()
188 return self._entries.iterkeys()
190 def __contains__(self, k):
192 return k in self._entries
194 def merge(self, items, fn, same, new_entry):
195 '''Helper method for updating the contents of the directory. Takes a list
196 describing the new contents of the directory, reuse entries that are
197 the same in both the old and new lists, create new entries, and delete
198 old entries missing from the new list.
200 items: iterable with new directory contents
202 fn: function to take an entry in 'items' and return the desired file or
203 directory name, or None if this entry should be skipped
205 same: function to compare an existing entry (a File or Directory
206 object) with an entry in the items list to determine whether to keep
209 new_entry: function to create a new directory entry (File or Directory
210 object) from an entry in the items list.
214 oldentries = self._entries
218 name = sanitize_filename(fn(i))
220 if name in oldentries and same(oldentries[name], i):
221 # move existing directory entry over
222 self._entries[name] = oldentries[name]
225 # create new directory entry
228 self._entries[name] = self.inodes.add_entry(ent)
231 # delete any other directory entries that were not in found in 'items'
233 llfuse.invalidate_entry(self.inode, str(i))
234 self.inodes.del_entry(oldentries[i])
238 self._mtime = time.time()
243 '''Delete all entries'''
244 oldentries = self._entries
247 if isinstance(n, Directory):
249 llfuse.invalidate_entry(self.inode, str(n))
250 self.inodes.del_entry(oldentries[n])
251 llfuse.invalidate_inode(self.inode)
258 class CollectionDirectory(Directory):
259 '''Represents the root of a directory tree holding a collection.'''
261 def __init__(self, parent_inode, inodes, api, num_retries, collection):
262 super(CollectionDirectory, self).__init__(parent_inode)
265 self.num_retries = num_retries
266 self.collection_object_file = None
267 self.collection_object = None
268 if isinstance(collection, dict):
269 self.collection_locator = collection['uuid']
271 self.collection_locator = collection
274 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
276 # Used by arv-web.py to switch the contents of the CollectionDirectory
277 def change_collection(self, new_locator):
278 """Switch the contents of the CollectionDirectory. Must be called with llfuse.lock held."""
279 self.collection_locator = new_locator
280 self.collection_object = None
283 def new_collection(self, new_collection_object, coll_reader):
284 self.collection_object = new_collection_object
286 if self.collection_object_file is not None:
287 self.collection_object_file.update(self.collection_object)
290 for s in coll_reader.all_streams():
292 for part in s.name().split('/'):
293 if part != '' and part != '.':
294 partname = sanitize_filename(part)
295 if partname not in cwd._entries:
296 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
297 cwd = cwd._entries[partname]
298 for k, v in s.files().items():
299 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
303 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
306 if self.collection_locator is None:
310 with llfuse.lock_released:
311 coll_reader = arvados.CollectionReader(
312 self.collection_locator, self.api, self.api.localkeep(),
313 num_retries=self.num_retries)
314 new_collection_object = coll_reader.api_response() or {}
315 # If the Collection only exists in Keep, there will be no API
316 # response. Fill in the fields we need.
317 if 'uuid' not in new_collection_object:
318 new_collection_object['uuid'] = self.collection_locator
319 if "portable_data_hash" not in new_collection_object:
320 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
321 if 'manifest_text' not in new_collection_object:
322 new_collection_object['manifest_text'] = coll_reader.manifest_text()
323 coll_reader.normalize()
324 # end with llfuse.lock_released, re-acquire lock
326 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
327 self.new_collection(new_collection_object, coll_reader)
331 except arvados.errors.NotFoundError:
332 _logger.exception("arv-mount %s: error", self.collection_locator)
333 except arvados.errors.ArgumentError as detail:
334 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
335 if self.collection_object is not None and "manifest_text" in self.collection_object:
336 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
338 _logger.exception("arv-mount %s: error", self.collection_locator)
339 if self.collection_object is not None and "manifest_text" in self.collection_object:
340 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
343 def __getitem__(self, item):
345 if item == '.arvados#collection':
346 if self.collection_object_file is None:
347 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
348 self.inodes.add_entry(self.collection_object_file)
349 return self.collection_object_file
351 return super(CollectionDirectory, self).__getitem__(item)
353 def __contains__(self, k):
354 if k == '.arvados#collection':
357 return super(CollectionDirectory, self).__contains__(k)
361 return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
364 class MagicDirectory(Directory):
365 '''A special directory that logically contains the set of all extant keep
366 locators. When a file is referenced by lookup(), it is tested to see if it
367 is a valid keep locator to a manifest, and if so, loads the manifest
368 contents as a subdirectory of this directory with the locator as the
369 directory name. Since querying a list of all extant keep locators is
370 impractical, only collections that have already been accessed are visible
375 This directory provides access to Arvados collections as subdirectories listed
376 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
377 the form '1234567890abcdefghijklmnopqrstuv+123').
379 Note that this directory will appear empty until you attempt to access a
380 specific collection subdirectory (such as trying to 'cd' into it), at which
381 point the collection will actually be looked up on the server and the directory
382 will appear if it exists.
385 def __init__(self, parent_inode, inodes, api, num_retries):
386 super(MagicDirectory, self).__init__(parent_inode)
389 self.num_retries = num_retries
391 def __setattr__(self, name, value):
392 super(MagicDirectory, self).__setattr__(name, value)
393 # When we're assigned an inode, add a README.
394 if ((name == 'inode') and (self.inode is not None) and
395 (not self._entries)):
396 self._entries['README'] = self.inodes.add_entry(
397 StringFile(self.inode, self.README_TEXT, time.time()))
398 # If we're the root directory, add an identical by_id subdirectory.
399 if self.inode == llfuse.ROOT_INODE:
400 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
401 self.inode, self.inodes, self.api, self.num_retries))
403 def __contains__(self, k):
404 if k in self._entries:
407 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
411 e = self.inodes.add_entry(CollectionDirectory(
412 self.inode, self.inodes, self.api, self.num_retries, k))
418 except Exception as e:
419 _logger.debug('arv-mount exception keep %s', e)
422 def __getitem__(self, item):
424 return self._entries[item]
426 raise KeyError("No collection with id " + item)
429 class RecursiveInvalidateDirectory(Directory):
430 def invalidate(self):
431 if self.inode == llfuse.ROOT_INODE:
432 llfuse.lock.acquire()
434 super(RecursiveInvalidateDirectory, self).invalidate()
435 for a in self._entries:
436 self._entries[a].invalidate()
440 if self.inode == llfuse.ROOT_INODE:
441 llfuse.lock.release()
444 class TagsDirectory(RecursiveInvalidateDirectory):
445 '''A special directory that contains as subdirectories all tags visible to the user.'''
447 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
448 super(TagsDirectory, self).__init__(parent_inode)
451 self.num_retries = num_retries
453 self._poll_time = poll_time
456 with llfuse.lock_released:
457 tags = self.api.links().list(
458 filters=[['link_class', '=', 'tag']],
459 select=['name'], distinct=True
460 ).execute(num_retries=self.num_retries)
462 self.merge(tags['items'],
464 lambda a, i: a.tag == i['name'],
465 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
468 class TagDirectory(Directory):
469 '''A special directory that contains as subdirectories all collections visible
470 to the user that are tagged with a particular tag.
473 def __init__(self, parent_inode, inodes, api, num_retries, tag,
474 poll=False, poll_time=60):
475 super(TagDirectory, self).__init__(parent_inode)
478 self.num_retries = num_retries
481 self._poll_time = poll_time
484 with llfuse.lock_released:
485 taggedcollections = self.api.links().list(
486 filters=[['link_class', '=', 'tag'],
487 ['name', '=', self.tag],
488 ['head_uuid', 'is_a', 'arvados#collection']],
490 ).execute(num_retries=self.num_retries)
491 self.merge(taggedcollections['items'],
492 lambda i: i['head_uuid'],
493 lambda a, i: a.collection_locator == i['head_uuid'],
494 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
497 class ProjectDirectory(Directory):
498 '''A special directory that contains the contents of a project.'''
500 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
501 poll=False, poll_time=60):
502 super(ProjectDirectory, self).__init__(parent_inode)
505 self.num_retries = num_retries
506 self.project_object = project_object
507 self.project_object_file = None
508 self.uuid = project_object['uuid']
510 self._poll_time = poll_time
512 def createDirectory(self, i):
513 if collection_uuid_pattern.match(i['uuid']):
514 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
515 elif group_uuid_pattern.match(i['uuid']):
516 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
517 elif link_uuid_pattern.match(i['uuid']):
518 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
519 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
522 elif uuid_pattern.match(i['uuid']):
523 return ObjectFile(self.parent_inode, i)
528 if self.project_object_file == None:
529 self.project_object_file = ObjectFile(self.inode, self.project_object)
530 self.inodes.add_entry(self.project_object_file)
534 if i['name'] is None or len(i['name']) == 0:
536 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
537 # collection or subproject
539 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
542 elif 'kind' in i and i['kind'].startswith('arvados#'):
544 return "{}.{}".format(i['name'], i['kind'][8:])
549 if isinstance(a, CollectionDirectory):
550 return a.collection_locator == i['uuid']
551 elif isinstance(a, ProjectDirectory):
552 return a.uuid == i['uuid']
553 elif isinstance(a, ObjectFile):
554 return a.uuid == i['uuid'] and not a.stale()
557 with llfuse.lock_released:
558 if group_uuid_pattern.match(self.uuid):
559 self.project_object = self.api.groups().get(
560 uuid=self.uuid).execute(num_retries=self.num_retries)
561 elif user_uuid_pattern.match(self.uuid):
562 self.project_object = self.api.users().get(
563 uuid=self.uuid).execute(num_retries=self.num_retries)
565 contents = arvados.util.list_all(self.api.groups().contents,
566 self.num_retries, uuid=self.uuid)
568 # end with llfuse.lock_released, re-acquire lock
573 self.createDirectory)
575 def __getitem__(self, item):
577 if item == '.arvados#project':
578 return self.project_object_file
580 return super(ProjectDirectory, self).__getitem__(item)
582 def __contains__(self, k):
583 if k == '.arvados#project':
586 return super(ProjectDirectory, self).__contains__(k)
589 class SharedDirectory(Directory):
590 '''A special directory that represents users or groups who have shared projects with me.'''
592 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
593 poll=False, poll_time=60):
594 super(SharedDirectory, self).__init__(parent_inode)
597 self.num_retries = num_retries
598 self.current_user = api.users().current().execute(num_retries=num_retries)
600 self._poll_time = poll_time
603 with llfuse.lock_released:
604 all_projects = arvados.util.list_all(
605 self.api.groups().list, self.num_retries,
606 filters=[['group_class','=','project']])
608 for ob in all_projects:
609 objects[ob['uuid']] = ob
613 for ob in all_projects:
614 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
616 root_owners[ob['owner_uuid']] = True
618 lusers = arvados.util.list_all(
619 self.api.users().list, self.num_retries,
620 filters=[['uuid','in', list(root_owners)]])
621 lgroups = arvados.util.list_all(
622 self.api.groups().list, self.num_retries,
623 filters=[['uuid','in', list(root_owners)]])
629 objects[l["uuid"]] = l
631 objects[l["uuid"]] = l
634 for r in root_owners:
638 contents[obr["name"]] = obr
639 if "first_name" in obr:
640 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
643 if r['owner_uuid'] not in objects:
644 contents[r['name']] = r
646 # end with llfuse.lock_released, re-acquire lock
649 self.merge(contents.items(),
651 lambda a, i: a.uuid == i[1]['uuid'],
652 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
657 class FileHandle(object):
658 '''Connects a numeric file handle to a File or Directory object that has
659 been opened by the client.'''
661 def __init__(self, fh, entry):
666 class Inodes(object):
667 '''Manage the set of inodes. This is the mapping from a numeric id
668 to a concrete File or Directory object'''
672 self._counter = itertools.count(llfuse.ROOT_INODE)
674 def __getitem__(self, item):
675 return self._entries[item]
677 def __setitem__(self, key, item):
678 self._entries[key] = item
681 return self._entries.iterkeys()
684 return self._entries.items()
686 def __contains__(self, k):
687 return k in self._entries
689 def add_entry(self, entry):
690 entry.inode = next(self._counter)
691 self._entries[entry.inode] = entry
694 def del_entry(self, entry):
695 llfuse.invalidate_inode(entry.inode)
696 del self._entries[entry.inode]
698 class Operations(llfuse.Operations):
699 '''This is the main interface with llfuse. The methods on this object are
700 called by llfuse threads to service FUSE events to query and read from
703 llfuse has its own global lock which is acquired before calling a request handler,
704 so request handlers do not run concurrently unless the lock is explicitly released
705 using "with llfuse.lock_released:"'''
707 def __init__(self, uid, gid, encoding="utf-8"):
708 super(Operations, self).__init__()
710 self.inodes = Inodes()
713 self.encoding = encoding
715 # dict of inode to filehandle
716 self._filehandles = {}
717 self._filehandles_counter = 1
719 # Other threads that need to wait until the fuse driver
720 # is fully initialized should wait() on this event object.
721 self.initlock = threading.Event()
724 # Allow threads that are waiting for the driver to be finished
725 # initializing to continue
728 def access(self, inode, mode, ctx):
731 def getattr(self, inode):
732 if inode not in self.inodes:
733 raise llfuse.FUSEError(errno.ENOENT)
735 e = self.inodes[inode]
737 entry = llfuse.EntryAttributes()
740 entry.entry_timeout = 300
741 entry.attr_timeout = 300
743 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
744 if isinstance(e, Directory):
745 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
746 elif isinstance(e, StreamReaderFile):
747 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
749 entry.st_mode |= stat.S_IFREG
752 entry.st_uid = self.uid
753 entry.st_gid = self.gid
756 entry.st_size = e.size()
758 entry.st_blksize = 512
759 entry.st_blocks = (e.size()/512)+1
760 entry.st_atime = int(e.atime())
761 entry.st_mtime = int(e.mtime())
762 entry.st_ctime = int(e.mtime())
766 def lookup(self, parent_inode, name):
767 name = unicode(name, self.encoding)
768 _logger.debug("arv-mount lookup: parent_inode %i name %s",
775 if parent_inode in self.inodes:
776 p = self.inodes[parent_inode]
778 inode = p.parent_inode
779 elif isinstance(p, Directory) and name in p:
780 inode = p[name].inode
783 return self.getattr(inode)
785 raise llfuse.FUSEError(errno.ENOENT)
787 def open(self, inode, flags):
788 if inode in self.inodes:
789 p = self.inodes[inode]
791 raise llfuse.FUSEError(errno.ENOENT)
793 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
794 raise llfuse.FUSEError(errno.EROFS)
796 if isinstance(p, Directory):
797 raise llfuse.FUSEError(errno.EISDIR)
799 fh = self._filehandles_counter
800 self._filehandles_counter += 1
801 self._filehandles[fh] = FileHandle(fh, p)
804 def read(self, fh, off, size):
805 _logger.debug("arv-mount read %i %i %i", fh, off, size)
806 if fh in self._filehandles:
807 handle = self._filehandles[fh]
809 raise llfuse.FUSEError(errno.EBADF)
812 handle.entry._atime = time.time()
815 with llfuse.lock_released:
816 return handle.entry.readfrom(off, size)
817 except arvados.errors.NotFoundError as e:
818 _logger.warning("Block not found: " + str(e))
819 raise llfuse.FUSEError(errno.EIO)
822 raise llfuse.FUSEError(errno.EIO)
824 def release(self, fh):
825 if fh in self._filehandles:
826 del self._filehandles[fh]
828 def opendir(self, inode):
829 _logger.debug("arv-mount opendir: inode %i", inode)
831 if inode in self.inodes:
832 p = self.inodes[inode]
834 raise llfuse.FUSEError(errno.ENOENT)
836 if not isinstance(p, Directory):
837 raise llfuse.FUSEError(errno.ENOTDIR)
839 fh = self._filehandles_counter
840 self._filehandles_counter += 1
841 if p.parent_inode in self.inodes:
842 parent = self.inodes[p.parent_inode]
844 raise llfuse.FUSEError(errno.EIO)
847 p._atime = time.time()
849 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
852 def readdir(self, fh, off):
853 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
855 if fh in self._filehandles:
856 handle = self._filehandles[fh]
858 raise llfuse.FUSEError(errno.EBADF)
860 _logger.debug("arv-mount handle.entry %s", handle.entry)
863 while e < len(handle.entry):
864 if handle.entry[e][1].inode in self.inodes:
866 yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
867 except UnicodeEncodeError:
871 def releasedir(self, fh):
872 del self._filehandles[fh]
875 st = llfuse.StatvfsData()
876 st.f_bsize = 64 * 1024
889 # The llfuse documentation recommends only overloading functions that
890 # are actually implemented, as the default implementation will raise ENOSYS.
891 # However, there is a bug in the llfuse default implementation of create()
892 # "create() takes exactly 5 positional arguments (6 given)" which will crash
894 # The workaround is to implement it with the proper number of parameters,
895 # and then everything works out.
896 def create(self, inode_parent, name, mode, flags, ctx):
897 raise llfuse.FUSEError(errno.EROFS)