2 # FUSE driver for Arvados Keep
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
26 _logger = logging.getLogger('arvados.arvados_fuse')
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
34 '''Parse Arvados timestamp to unix time.'''
36 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
37 except (TypeError, ValueError):
40 def sanitize_filename(dirty):
41 '''Replace disallowed filename characters with harmless "_".'''
51 return _disallowed_filename_characters.sub('_', dirty)
54 class FreshBase(object):
55 '''Base class for maintaining fresh/stale state to determine when to update.'''
59 self._last_update = time.time()
60 self._atime = time.time()
63 # Mark the value as stale
67 # Test if the entries dict is stale.
72 return (self._last_update + self._poll_time) < self._atime
77 self._last_update = time.time()
82 class File(FreshBase):
83 '''Base for file objects.'''
85 def __init__(self, parent_inode, _mtime=0):
86 super(File, self).__init__()
88 self.parent_inode = parent_inode
94 def readfrom(self, off, size):
101 class StreamReaderFile(File):
102 '''Wraps a StreamFileReader as a file.'''
104 def __init__(self, parent_inode, reader, _mtime):
105 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
109 return self.reader.size()
111 def readfrom(self, off, size):
112 return self.reader.readfrom(off, size)
118 class StringFile(File):
119 '''Wrap a simple string as a file'''
120 def __init__(self, parent_inode, contents, _mtime):
121 super(StringFile, self).__init__(parent_inode, _mtime)
122 self.contents = contents
125 return len(self.contents)
127 def readfrom(self, off, size):
128 return self.contents[off:(off+size)]
131 class ObjectFile(StringFile):
132 '''Wrap a dict as a serialized json object.'''
134 def __init__(self, parent_inode, obj):
135 super(ObjectFile, self).__init__(parent_inode, "", 0)
136 self.uuid = obj['uuid']
139 def update(self, obj):
140 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
141 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
144 class Directory(FreshBase):
145 '''Generic directory object, backed by a dict.
146 Consists of a set of entries with the key representing the filename
147 and the value referencing a File or Directory object.
150 def __init__(self, parent_inode):
151 super(Directory, self).__init__()
153 '''parent_inode is the integer inode number'''
155 if not isinstance(parent_inode, int):
156 raise Exception("parent_inode should be an int")
157 self.parent_inode = parent_inode
159 self._mtime = time.time()
161 # Overriden by subclasses to implement logic to update the entries dict
162 # when the directory is stale
166 # Only used when computing the size of the disk footprint of the directory
171 def checkupdate(self):
175 except apiclient.errors.HttpError as e:
178 def __getitem__(self, item):
180 return self._entries[item]
184 return self._entries.items()
188 return self._entries.iterkeys()
190 def __contains__(self, k):
192 return k in self._entries
194 def merge(self, items, fn, same, new_entry):
195 '''Helper method for updating the contents of the directory. Takes a list
196 describing the new contents of the directory, reuse entries that are
197 the same in both the old and new lists, create new entries, and delete
198 old entries missing from the new list.
200 items: iterable with new directory contents
202 fn: function to take an entry in 'items' and return the desired file or
203 directory name, or None if this entry should be skipped
205 same: function to compare an existing entry (a File or Directory
206 object) with an entry in the items list to determine whether to keep
209 new_entry: function to create a new directory entry (File or Directory
210 object) from an entry in the items list.
214 oldentries = self._entries
218 name = sanitize_filename(fn(i))
220 if name in oldentries and same(oldentries[name], i):
221 # move existing directory entry over
222 self._entries[name] = oldentries[name]
225 # create new directory entry
228 self._entries[name] = self.inodes.add_entry(ent)
231 # delete any other directory entries that were not in found in 'items'
233 llfuse.invalidate_entry(self.inode, str(i))
234 self.inodes.del_entry(oldentries[i])
238 self._mtime = time.time()
243 '''Delete all entries'''
244 oldentries = self._entries
247 if isinstance(n, Directory):
249 llfuse.invalidate_entry(self.inode, str(n))
250 self.inodes.del_entry(oldentries[n])
257 class CollectionDirectory(Directory):
258 '''Represents the root of a directory tree holding a collection.'''
260 def __init__(self, parent_inode, inodes, api, num_retries, collection):
261 super(CollectionDirectory, self).__init__(parent_inode)
264 self.num_retries = num_retries
265 self.collection_object_file = None
266 self.collection_object = None
267 if isinstance(collection, dict):
268 self.collection_locator = collection['uuid']
270 self.collection_locator = collection
273 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
275 def new_collection(self, new_collection_object, coll_reader):
276 self.collection_object = new_collection_object
278 if self.collection_object_file is not None:
279 self.collection_object_file.update(self.collection_object)
282 for s in coll_reader.all_streams():
284 for part in s.name().split('/'):
285 if part != '' and part != '.':
286 partname = sanitize_filename(part)
287 if partname not in cwd._entries:
288 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
289 cwd = cwd._entries[partname]
290 for k, v in s.files().items():
291 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
295 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
298 with llfuse.lock_released:
299 coll_reader = arvados.CollectionReader(
300 self.collection_locator, self.api, self.api.localkeep(),
301 num_retries=self.num_retries)
302 new_collection_object = coll_reader.api_response() or {}
303 # If the Collection only exists in Keep, there will be no API
304 # response. Fill in the fields we need.
305 if 'uuid' not in new_collection_object:
306 new_collection_object['uuid'] = self.collection_locator
307 if "portable_data_hash" not in new_collection_object:
308 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
309 if 'manifest_text' not in new_collection_object:
310 new_collection_object['manifest_text'] = coll_reader.manifest_text()
311 coll_reader.normalize()
312 # end with llfuse.lock_released, re-acquire lock
314 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
315 self.new_collection(new_collection_object, coll_reader)
319 except apiclient.errors.NotFoundError:
320 _logger.exception("arv-mount %s: error", self.collection_locator)
321 except arvados.errors.ArgumentError as detail:
322 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
323 if self.collection_object is not None and "manifest_text" in self.collection_object:
324 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
326 _logger.exception("arv-mount %s: error", self.collection_locator)
327 if self.collection_object is not None and "manifest_text" in self.collection_object:
328 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
331 def __getitem__(self, item):
333 if item == '.arvados#collection':
334 if self.collection_object_file is None:
335 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
336 self.inodes.add_entry(self.collection_object_file)
337 return self.collection_object_file
339 return super(CollectionDirectory, self).__getitem__(item)
341 def __contains__(self, k):
342 if k == '.arvados#collection':
345 return super(CollectionDirectory, self).__contains__(k)
349 return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
352 class MagicDirectory(Directory):
353 '''A special directory that logically contains the set of all extant keep
354 locators. When a file is referenced by lookup(), it is tested to see if it
355 is a valid keep locator to a manifest, and if so, loads the manifest
356 contents as a subdirectory of this directory with the locator as the
357 directory name. Since querying a list of all extant keep locators is
358 impractical, only collections that have already been accessed are visible
363 This directory provides access to Arvados collections as subdirectories listed
364 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
365 the form '1234567890abcdefghijklmnopqrstuv+123').
367 Note that this directory will appear empty until you attempt to access a
368 specific collection subdirectory (such as trying to 'cd' into it), at which
369 point the collection will actually be looked up on the server and the directory
370 will appear if it exists.
373 def __init__(self, parent_inode, inodes, api, num_retries):
374 super(MagicDirectory, self).__init__(parent_inode)
377 self.num_retries = num_retries
379 def __setattr__(self, name, value):
380 super(MagicDirectory, self).__setattr__(name, value)
381 # When we're assigned an inode, add a README.
382 if ((name == 'inode') and (self.inode is not None) and
383 (not self._entries)):
384 self._entries['README'] = self.inodes.add_entry(
385 StringFile(self.inode, self.README_TEXT, time.time()))
386 # If we're the root directory, add an identical by_id subdirectory.
387 if self.inode == llfuse.ROOT_INODE:
388 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
389 self.inode, self.inodes, self.api, self.num_retries))
391 def __contains__(self, k):
392 if k in self._entries:
395 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
399 e = self.inodes.add_entry(CollectionDirectory(
400 self.inode, self.inodes, self.api, self.num_retries, k))
406 except Exception as e:
407 _logger.debug('arv-mount exception keep %s', e)
410 def __getitem__(self, item):
412 return self._entries[item]
414 raise KeyError("No collection with id " + item)
417 class RecursiveInvalidateDirectory(Directory):
418 def invalidate(self):
419 if self.inode == llfuse.ROOT_INODE:
420 llfuse.lock.acquire()
422 super(RecursiveInvalidateDirectory, self).invalidate()
423 for a in self._entries:
424 self._entries[a].invalidate()
428 if self.inode == llfuse.ROOT_INODE:
429 llfuse.lock.release()
432 class TagsDirectory(RecursiveInvalidateDirectory):
433 '''A special directory that contains as subdirectories all tags visible to the user.'''
435 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
436 super(TagsDirectory, self).__init__(parent_inode)
439 self.num_retries = num_retries
441 self._poll_time = poll_time
444 with llfuse.lock_released:
445 tags = self.api.links().list(
446 filters=[['link_class', '=', 'tag']],
447 select=['name'], distinct=True
448 ).execute(num_retries=self.num_retries)
450 self.merge(tags['items'],
451 lambda i: i['name'] if 'name' in i else i['uuid'],
452 lambda a, i: a.tag == i,
453 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
456 class TagDirectory(Directory):
457 '''A special directory that contains as subdirectories all collections visible
458 to the user that are tagged with a particular tag.
461 def __init__(self, parent_inode, inodes, api, num_retries, tag,
462 poll=False, poll_time=60):
463 super(TagDirectory, self).__init__(parent_inode)
466 self.num_retries = num_retries
469 self._poll_time = poll_time
472 with llfuse.lock_released:
473 taggedcollections = self.api.links().list(
474 filters=[['link_class', '=', 'tag'],
475 ['name', '=', self.tag],
476 ['head_uuid', 'is_a', 'arvados#collection']],
478 ).execute(num_retries=self.num_retries)
479 self.merge(taggedcollections['items'],
480 lambda i: i['head_uuid'],
481 lambda a, i: a.collection_locator == i['head_uuid'],
482 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
485 class ProjectDirectory(Directory):
486 '''A special directory that contains the contents of a project.'''
488 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
489 poll=False, poll_time=60):
490 super(ProjectDirectory, self).__init__(parent_inode)
493 self.num_retries = num_retries
494 self.project_object = project_object
495 self.project_object_file = None
496 self.uuid = project_object['uuid']
498 self._poll_time = poll_time
500 def createDirectory(self, i):
501 if collection_uuid_pattern.match(i['uuid']):
502 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
503 elif group_uuid_pattern.match(i['uuid']):
504 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
505 elif link_uuid_pattern.match(i['uuid']):
506 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
507 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
510 elif uuid_pattern.match(i['uuid']):
511 return ObjectFile(self.parent_inode, i)
516 if self.project_object_file == None:
517 self.project_object_file = ObjectFile(self.inode, self.project_object)
518 self.inodes.add_entry(self.project_object_file)
522 if i['name'] is None or len(i['name']) == 0:
524 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
525 # collection or subproject
527 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
530 elif 'kind' in i and i['kind'].startswith('arvados#'):
532 return "{}.{}".format(i['name'], i['kind'][8:])
537 if isinstance(a, CollectionDirectory):
538 return a.collection_locator == i['uuid']
539 elif isinstance(a, ProjectDirectory):
540 return a.uuid == i['uuid']
541 elif isinstance(a, ObjectFile):
542 return a.uuid == i['uuid'] and not a.stale()
545 with llfuse.lock_released:
546 if group_uuid_pattern.match(self.uuid):
547 self.project_object = self.api.groups().get(
548 uuid=self.uuid).execute(num_retries=self.num_retries)
549 elif user_uuid_pattern.match(self.uuid):
550 self.project_object = self.api.users().get(
551 uuid=self.uuid).execute(num_retries=self.num_retries)
553 contents = arvados.util.list_all(self.api.groups().contents,
554 self.num_retries, uuid=self.uuid)
556 # end with llfuse.lock_released, re-acquire lock
561 self.createDirectory)
563 def __getitem__(self, item):
565 if item == '.arvados#project':
566 return self.project_object_file
568 return super(ProjectDirectory, self).__getitem__(item)
570 def __contains__(self, k):
571 if k == '.arvados#project':
574 return super(ProjectDirectory, self).__contains__(k)
577 class SharedDirectory(Directory):
578 '''A special directory that represents users or groups who have shared projects with me.'''
580 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
581 poll=False, poll_time=60):
582 super(SharedDirectory, self).__init__(parent_inode)
585 self.num_retries = num_retries
586 self.current_user = api.users().current().execute(num_retries=num_retries)
588 self._poll_time = poll_time
591 with llfuse.lock_released:
592 all_projects = arvados.util.list_all(
593 self.api.groups().list, self.num_retries,
594 filters=[['group_class','=','project']])
596 for ob in all_projects:
597 objects[ob['uuid']] = ob
601 for ob in all_projects:
602 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
604 root_owners[ob['owner_uuid']] = True
606 lusers = arvados.util.list_all(
607 self.api.users().list, self.num_retries,
608 filters=[['uuid','in', list(root_owners)]])
609 lgroups = arvados.util.list_all(
610 self.api.groups().list, self.num_retries,
611 filters=[['uuid','in', list(root_owners)]])
617 objects[l["uuid"]] = l
619 objects[l["uuid"]] = l
622 for r in root_owners:
626 contents[obr["name"]] = obr
627 if "first_name" in obr:
628 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
631 if r['owner_uuid'] not in objects:
632 contents[r['name']] = r
634 # end with llfuse.lock_released, re-acquire lock
637 self.merge(contents.items(),
639 lambda a, i: a.uuid == i[1]['uuid'],
640 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
645 class FileHandle(object):
646 '''Connects a numeric file handle to a File or Directory object that has
647 been opened by the client.'''
649 def __init__(self, fh, entry):
654 class Inodes(object):
655 '''Manage the set of inodes. This is the mapping from a numeric id
656 to a concrete File or Directory object'''
660 self._counter = itertools.count(llfuse.ROOT_INODE)
662 def __getitem__(self, item):
663 return self._entries[item]
665 def __setitem__(self, key, item):
666 self._entries[key] = item
669 return self._entries.iterkeys()
672 return self._entries.items()
674 def __contains__(self, k):
675 return k in self._entries
677 def add_entry(self, entry):
678 entry.inode = next(self._counter)
679 self._entries[entry.inode] = entry
682 def del_entry(self, entry):
683 llfuse.invalidate_inode(entry.inode)
684 del self._entries[entry.inode]
686 class Operations(llfuse.Operations):
687 '''This is the main interface with llfuse. The methods on this object are
688 called by llfuse threads to service FUSE events to query and read from
691 llfuse has its own global lock which is acquired before calling a request handler,
692 so request handlers do not run concurrently unless the lock is explicitly released
693 using "with llfuse.lock_released:"'''
695 def __init__(self, uid, gid, encoding="utf-8"):
696 super(Operations, self).__init__()
698 self.inodes = Inodes()
701 self.encoding = encoding
703 # dict of inode to filehandle
704 self._filehandles = {}
705 self._filehandles_counter = 1
707 # Other threads that need to wait until the fuse driver
708 # is fully initialized should wait() on this event object.
709 self.initlock = threading.Event()
712 # Allow threads that are waiting for the driver to be finished
713 # initializing to continue
716 def access(self, inode, mode, ctx):
719 def getattr(self, inode):
720 if inode not in self.inodes:
721 raise llfuse.FUSEError(errno.ENOENT)
723 e = self.inodes[inode]
725 entry = llfuse.EntryAttributes()
728 entry.entry_timeout = 300
729 entry.attr_timeout = 300
731 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
732 if isinstance(e, Directory):
733 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
734 elif isinstance(e, StreamReaderFile):
735 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
737 entry.st_mode |= stat.S_IFREG
740 entry.st_uid = self.uid
741 entry.st_gid = self.gid
744 entry.st_size = e.size()
746 entry.st_blksize = 512
747 entry.st_blocks = (e.size()/512)+1
748 entry.st_atime = int(e.atime())
749 entry.st_mtime = int(e.mtime())
750 entry.st_ctime = int(e.mtime())
754 def lookup(self, parent_inode, name):
755 name = unicode(name, self.encoding)
756 _logger.debug("arv-mount lookup: parent_inode %i name %s",
763 if parent_inode in self.inodes:
764 p = self.inodes[parent_inode]
766 inode = p.parent_inode
767 elif isinstance(p, Directory) and name in p:
768 inode = p[name].inode
771 return self.getattr(inode)
773 raise llfuse.FUSEError(errno.ENOENT)
775 def open(self, inode, flags):
776 if inode in self.inodes:
777 p = self.inodes[inode]
779 raise llfuse.FUSEError(errno.ENOENT)
781 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
782 raise llfuse.FUSEError(errno.EROFS)
784 if isinstance(p, Directory):
785 raise llfuse.FUSEError(errno.EISDIR)
787 fh = self._filehandles_counter
788 self._filehandles_counter += 1
789 self._filehandles[fh] = FileHandle(fh, p)
792 def read(self, fh, off, size):
793 _logger.debug("arv-mount read %i %i %i", fh, off, size)
794 if fh in self._filehandles:
795 handle = self._filehandles[fh]
797 raise llfuse.FUSEError(errno.EBADF)
800 handle.entry._atime = time.time()
803 with llfuse.lock_released:
804 return handle.entry.readfrom(off, size)
805 except arvados.errors.NotFoundError as e:
806 _logger.warning("Block not found: " + str(e))
807 raise llfuse.FUSEError(errno.EIO)
810 raise llfuse.FUSEError(errno.EIO)
812 def release(self, fh):
813 if fh in self._filehandles:
814 del self._filehandles[fh]
816 def opendir(self, inode):
817 _logger.debug("arv-mount opendir: inode %i", inode)
819 if inode in self.inodes:
820 p = self.inodes[inode]
822 raise llfuse.FUSEError(errno.ENOENT)
824 if not isinstance(p, Directory):
825 raise llfuse.FUSEError(errno.ENOTDIR)
827 fh = self._filehandles_counter
828 self._filehandles_counter += 1
829 if p.parent_inode in self.inodes:
830 parent = self.inodes[p.parent_inode]
832 raise llfuse.FUSEError(errno.EIO)
835 p._atime = time.time()
837 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
840 def readdir(self, fh, off):
841 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
843 if fh in self._filehandles:
844 handle = self._filehandles[fh]
846 raise llfuse.FUSEError(errno.EBADF)
848 _logger.debug("arv-mount handle.entry %s", handle.entry)
851 while e < len(handle.entry):
852 if handle.entry[e][1].inode in self.inodes:
854 yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
855 except UnicodeEncodeError:
859 def releasedir(self, fh):
860 del self._filehandles[fh]
863 st = llfuse.StatvfsData()
864 st.f_bsize = 64 * 1024
877 # The llfuse documentation recommends only overloading functions that
878 # are actually implemented, as the default implementation will raise ENOSYS.
879 # However, there is a bug in the llfuse default implementation of create()
880 # "create() takes exactly 5 positional arguments (6 given)" which will crash
882 # The workaround is to implement it with the proper number of parameters,
883 # and then everything works out.
884 def create(self, inode_parent, name, mode, flags, ctx):
885 raise llfuse.FUSEError(errno.EROFS)