2 # FUSE driver for Arvados Keep
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
26 _logger = logging.getLogger('arvados.arvados_fuse')
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
33 class SafeApi(object):
34 """Threadsafe wrapper for API object.
36 This stores and returns a different api object per thread, because
37 httplib2 which underlies apiclient is not threadsafe.
40 def __init__(self, config):
41 self.host = config.get('ARVADOS_API_HOST')
42 self.api_token = config.get('ARVADOS_API_TOKEN')
43 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
44 self.local = threading.local()
45 self.block_cache = arvados.KeepBlockCache()
48 if 'api' not in self.local.__dict__:
49 self.local.api = arvados.api(
51 host=self.host, token=self.api_token, insecure=self.insecure)
55 if 'keep' not in self.local.__dict__:
56 self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
57 return self.local.keep
59 def __getattr__(self, name):
60 # Proxy nonexistent attributes to the local API client.
62 return getattr(self.localapi(), name)
63 except AttributeError:
64 return super(SafeApi, self).__getattr__(name)
68 """Parse Arvados timestamp to unix time."""
72 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
73 except (TypeError, ValueError):
76 def sanitize_filename(dirty):
77 '''Replace disallowed filename characters with harmless "_".'''
87 return _disallowed_filename_characters.sub('_', dirty)
90 class FreshBase(object):
91 '''Base class for maintaining fresh/stale state to determine when to update.'''
95 self._last_update = time.time()
96 self._atime = time.time()
99 # Mark the value as stale
100 def invalidate(self):
103 # Test if the entries dict is stale.
108 return (self._last_update + self._poll_time) < self._atime
113 self._last_update = time.time()
118 class File(FreshBase):
119 '''Base for file objects.'''
121 def __init__(self, parent_inode, _mtime=0):
122 super(File, self).__init__()
124 self.parent_inode = parent_inode
130 def readfrom(self, off, size):
137 class StreamReaderFile(File):
138 '''Wraps a StreamFileReader as a file.'''
140 def __init__(self, parent_inode, reader, _mtime):
141 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
145 return self.reader.size()
147 def readfrom(self, off, size):
148 return self.reader.readfrom(off, size)
154 class StringFile(File):
155 '''Wrap a simple string as a file'''
156 def __init__(self, parent_inode, contents, _mtime):
157 super(StringFile, self).__init__(parent_inode, _mtime)
158 self.contents = contents
161 return len(self.contents)
163 def readfrom(self, off, size):
164 return self.contents[off:(off+size)]
167 class ObjectFile(StringFile):
168 '''Wrap a dict as a serialized json object.'''
170 def __init__(self, parent_inode, obj):
171 super(ObjectFile, self).__init__(parent_inode, "", 0)
172 self.uuid = obj['uuid']
175 def update(self, obj):
176 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
177 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
180 class Directory(FreshBase):
181 '''Generic directory object, backed by a dict.
182 Consists of a set of entries with the key representing the filename
183 and the value referencing a File or Directory object.
186 def __init__(self, parent_inode):
187 super(Directory, self).__init__()
189 '''parent_inode is the integer inode number'''
191 if not isinstance(parent_inode, int):
192 raise Exception("parent_inode should be an int")
193 self.parent_inode = parent_inode
195 self._mtime = time.time()
197 # Overriden by subclasses to implement logic to update the entries dict
198 # when the directory is stale
202 # Only used when computing the size of the disk footprint of the directory
207 def checkupdate(self):
211 except apiclient.errors.HttpError as e:
214 def __getitem__(self, item):
216 return self._entries[item]
220 return self._entries.items()
224 return self._entries.iterkeys()
226 def __contains__(self, k):
228 return k in self._entries
230 def merge(self, items, fn, same, new_entry):
231 '''Helper method for updating the contents of the directory. Takes a list
232 describing the new contents of the directory, reuse entries that are
233 the same in both the old and new lists, create new entries, and delete
234 old entries missing from the new list.
236 items: iterable with new directory contents
238 fn: function to take an entry in 'items' and return the desired file or
239 directory name, or None if this entry should be skipped
241 same: function to compare an existing entry (a File or Directory
242 object) with an entry in the items list to determine whether to keep
245 new_entry: function to create a new directory entry (File or Directory
246 object) from an entry in the items list.
250 oldentries = self._entries
254 name = sanitize_filename(fn(i))
256 if name in oldentries and same(oldentries[name], i):
257 # move existing directory entry over
258 self._entries[name] = oldentries[name]
261 # create new directory entry
264 self._entries[name] = self.inodes.add_entry(ent)
267 # delete any other directory entries that were not in found in 'items'
269 llfuse.invalidate_entry(self.inode, str(i))
270 self.inodes.del_entry(oldentries[i])
274 self._mtime = time.time()
279 '''Delete all entries'''
280 oldentries = self._entries
283 if isinstance(n, Directory):
285 llfuse.invalidate_entry(self.inode, str(n))
286 self.inodes.del_entry(oldentries[n])
287 llfuse.invalidate_inode(self.inode)
294 class CollectionDirectory(Directory):
295 '''Represents the root of a directory tree holding a collection.'''
297 def __init__(self, parent_inode, inodes, api, num_retries, collection):
298 super(CollectionDirectory, self).__init__(parent_inode)
301 self.num_retries = num_retries
302 self.collection_object_file = None
303 self.collection_object = None
304 if isinstance(collection, dict):
305 self.collection_locator = collection['uuid']
306 self._mtime = convertTime(collection.get('modified_at'))
308 self.collection_locator = collection
312 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
314 # Used by arv-web.py to switch the contents of the CollectionDirectory
315 def change_collection(self, new_locator):
316 """Switch the contents of the CollectionDirectory. Must be called with llfuse.lock held."""
317 self.collection_locator = new_locator
318 self.collection_object = None
321 def new_collection(self, new_collection_object, coll_reader):
322 self.collection_object = new_collection_object
324 self._mtime = convertTime(self.collection_object.get('modified_at'))
326 if self.collection_object_file is not None:
327 self.collection_object_file.update(self.collection_object)
330 for s in coll_reader.all_streams():
332 for part in s.name().split('/'):
333 if part != '' and part != '.':
334 partname = sanitize_filename(part)
335 if partname not in cwd._entries:
336 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
337 cwd = cwd._entries[partname]
338 for k, v in s.files().items():
339 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
343 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
346 if self.collection_locator is None:
350 with llfuse.lock_released:
351 coll_reader = arvados.CollectionReader(
352 self.collection_locator, self.api, self.api.localkeep(),
353 num_retries=self.num_retries)
354 new_collection_object = coll_reader.api_response() or {}
355 # If the Collection only exists in Keep, there will be no API
356 # response. Fill in the fields we need.
357 if 'uuid' not in new_collection_object:
358 new_collection_object['uuid'] = self.collection_locator
359 if "portable_data_hash" not in new_collection_object:
360 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
361 if 'manifest_text' not in new_collection_object:
362 new_collection_object['manifest_text'] = coll_reader.manifest_text()
363 coll_reader.normalize()
364 # end with llfuse.lock_released, re-acquire lock
366 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
367 self.new_collection(new_collection_object, coll_reader)
371 except arvados.errors.NotFoundError:
372 _logger.exception("arv-mount %s: error", self.collection_locator)
373 except arvados.errors.ArgumentError as detail:
374 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
375 if self.collection_object is not None and "manifest_text" in self.collection_object:
376 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
378 _logger.exception("arv-mount %s: error", self.collection_locator)
379 if self.collection_object is not None and "manifest_text" in self.collection_object:
380 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
383 def __getitem__(self, item):
385 if item == '.arvados#collection':
386 if self.collection_object_file is None:
387 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
388 self.inodes.add_entry(self.collection_object_file)
389 return self.collection_object_file
391 return super(CollectionDirectory, self).__getitem__(item)
393 def __contains__(self, k):
394 if k == '.arvados#collection':
397 return super(CollectionDirectory, self).__contains__(k)
400 class MagicDirectory(Directory):
401 '''A special directory that logically contains the set of all extant keep
402 locators. When a file is referenced by lookup(), it is tested to see if it
403 is a valid keep locator to a manifest, and if so, loads the manifest
404 contents as a subdirectory of this directory with the locator as the
405 directory name. Since querying a list of all extant keep locators is
406 impractical, only collections that have already been accessed are visible
411 This directory provides access to Arvados collections as subdirectories listed
412 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
413 the form '1234567890abcdefghijklmnopqrstuv+123').
415 Note that this directory will appear empty until you attempt to access a
416 specific collection subdirectory (such as trying to 'cd' into it), at which
417 point the collection will actually be looked up on the server and the directory
418 will appear if it exists.
421 def __init__(self, parent_inode, inodes, api, num_retries):
422 super(MagicDirectory, self).__init__(parent_inode)
425 self.num_retries = num_retries
427 def __setattr__(self, name, value):
428 super(MagicDirectory, self).__setattr__(name, value)
429 # When we're assigned an inode, add a README.
430 if ((name == 'inode') and (self.inode is not None) and
431 (not self._entries)):
432 self._entries['README'] = self.inodes.add_entry(
433 StringFile(self.inode, self.README_TEXT, time.time()))
434 # If we're the root directory, add an identical by_id subdirectory.
435 if self.inode == llfuse.ROOT_INODE:
436 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
437 self.inode, self.inodes, self.api, self.num_retries))
439 def __contains__(self, k):
440 if k in self._entries:
443 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
447 e = self.inodes.add_entry(CollectionDirectory(
448 self.inode, self.inodes, self.api, self.num_retries, k))
454 except Exception as e:
455 _logger.debug('arv-mount exception keep %s', e)
458 def __getitem__(self, item):
460 return self._entries[item]
462 raise KeyError("No collection with id " + item)
465 class RecursiveInvalidateDirectory(Directory):
466 def invalidate(self):
467 if self.inode == llfuse.ROOT_INODE:
468 llfuse.lock.acquire()
470 super(RecursiveInvalidateDirectory, self).invalidate()
471 for a in self._entries:
472 self._entries[a].invalidate()
476 if self.inode == llfuse.ROOT_INODE:
477 llfuse.lock.release()
480 class TagsDirectory(RecursiveInvalidateDirectory):
481 '''A special directory that contains as subdirectories all tags visible to the user.'''
483 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
484 super(TagsDirectory, self).__init__(parent_inode)
487 self.num_retries = num_retries
489 self._poll_time = poll_time
492 with llfuse.lock_released:
493 tags = self.api.links().list(
494 filters=[['link_class', '=', 'tag']],
495 select=['name'], distinct=True
496 ).execute(num_retries=self.num_retries)
498 self.merge(tags['items'],
500 lambda a, i: a.tag == i['name'],
501 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
504 class TagDirectory(Directory):
505 '''A special directory that contains as subdirectories all collections visible
506 to the user that are tagged with a particular tag.
509 def __init__(self, parent_inode, inodes, api, num_retries, tag,
510 poll=False, poll_time=60):
511 super(TagDirectory, self).__init__(parent_inode)
514 self.num_retries = num_retries
517 self._poll_time = poll_time
520 with llfuse.lock_released:
521 taggedcollections = self.api.links().list(
522 filters=[['link_class', '=', 'tag'],
523 ['name', '=', self.tag],
524 ['head_uuid', 'is_a', 'arvados#collection']],
526 ).execute(num_retries=self.num_retries)
527 self.merge(taggedcollections['items'],
528 lambda i: i['head_uuid'],
529 lambda a, i: a.collection_locator == i['head_uuid'],
530 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
533 class ProjectDirectory(Directory):
534 '''A special directory that contains the contents of a project.'''
536 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
537 poll=False, poll_time=60):
538 super(ProjectDirectory, self).__init__(parent_inode)
541 self.num_retries = num_retries
542 self.project_object = project_object
543 self.project_object_file = None
544 self.uuid = project_object['uuid']
546 self._poll_time = poll_time
548 def createDirectory(self, i):
549 if collection_uuid_pattern.match(i['uuid']):
550 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
551 elif group_uuid_pattern.match(i['uuid']):
552 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
553 elif link_uuid_pattern.match(i['uuid']):
554 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
555 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
558 elif uuid_pattern.match(i['uuid']):
559 return ObjectFile(self.parent_inode, i)
564 if self.project_object_file == None:
565 self.project_object_file = ObjectFile(self.inode, self.project_object)
566 self.inodes.add_entry(self.project_object_file)
570 if i['name'] is None or len(i['name']) == 0:
572 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
573 # collection or subproject
575 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
578 elif 'kind' in i and i['kind'].startswith('arvados#'):
580 return "{}.{}".format(i['name'], i['kind'][8:])
585 if isinstance(a, CollectionDirectory):
586 return a.collection_locator == i['uuid']
587 elif isinstance(a, ProjectDirectory):
588 return a.uuid == i['uuid']
589 elif isinstance(a, ObjectFile):
590 return a.uuid == i['uuid'] and not a.stale()
593 with llfuse.lock_released:
594 if group_uuid_pattern.match(self.uuid):
595 self.project_object = self.api.groups().get(
596 uuid=self.uuid).execute(num_retries=self.num_retries)
597 elif user_uuid_pattern.match(self.uuid):
598 self.project_object = self.api.users().get(
599 uuid=self.uuid).execute(num_retries=self.num_retries)
601 contents = arvados.util.list_all(self.api.groups().contents,
602 self.num_retries, uuid=self.uuid)
603 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
604 contents += arvados.util.list_all(
605 self.api.links().list, self.num_retries,
606 filters=[['tail_uuid', '=', self.uuid],
607 ['link_class', '=', 'name']])
609 # end with llfuse.lock_released, re-acquire lock
614 self.createDirectory)
616 def __getitem__(self, item):
618 if item == '.arvados#project':
619 return self.project_object_file
621 return super(ProjectDirectory, self).__getitem__(item)
623 def __contains__(self, k):
624 if k == '.arvados#project':
627 return super(ProjectDirectory, self).__contains__(k)
630 class SharedDirectory(Directory):
631 '''A special directory that represents users or groups who have shared projects with me.'''
633 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
634 poll=False, poll_time=60):
635 super(SharedDirectory, self).__init__(parent_inode)
638 self.num_retries = num_retries
639 self.current_user = api.users().current().execute(num_retries=num_retries)
641 self._poll_time = poll_time
644 with llfuse.lock_released:
645 all_projects = arvados.util.list_all(
646 self.api.groups().list, self.num_retries,
647 filters=[['group_class','=','project']])
649 for ob in all_projects:
650 objects[ob['uuid']] = ob
654 for ob in all_projects:
655 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
657 root_owners[ob['owner_uuid']] = True
659 lusers = arvados.util.list_all(
660 self.api.users().list, self.num_retries,
661 filters=[['uuid','in', list(root_owners)]])
662 lgroups = arvados.util.list_all(
663 self.api.groups().list, self.num_retries,
664 filters=[['uuid','in', list(root_owners)]])
670 objects[l["uuid"]] = l
672 objects[l["uuid"]] = l
675 for r in root_owners:
679 contents[obr["name"]] = obr
680 if "first_name" in obr:
681 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
684 if r['owner_uuid'] not in objects:
685 contents[r['name']] = r
687 # end with llfuse.lock_released, re-acquire lock
690 self.merge(contents.items(),
692 lambda a, i: a.uuid == i[1]['uuid'],
693 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
698 class FileHandle(object):
699 '''Connects a numeric file handle to a File or Directory object that has
700 been opened by the client.'''
702 def __init__(self, fh, entry):
707 class Inodes(object):
708 '''Manage the set of inodes. This is the mapping from a numeric id
709 to a concrete File or Directory object'''
713 self._counter = itertools.count(llfuse.ROOT_INODE)
715 def __getitem__(self, item):
716 return self._entries[item]
718 def __setitem__(self, key, item):
719 self._entries[key] = item
722 return self._entries.iterkeys()
725 return self._entries.items()
727 def __contains__(self, k):
728 return k in self._entries
730 def add_entry(self, entry):
731 entry.inode = next(self._counter)
732 self._entries[entry.inode] = entry
735 def del_entry(self, entry):
736 llfuse.invalidate_inode(entry.inode)
737 del self._entries[entry.inode]
739 class Operations(llfuse.Operations):
740 '''This is the main interface with llfuse. The methods on this object are
741 called by llfuse threads to service FUSE events to query and read from
744 llfuse has its own global lock which is acquired before calling a request handler,
745 so request handlers do not run concurrently unless the lock is explicitly released
746 using "with llfuse.lock_released:"'''
748 def __init__(self, uid, gid, encoding="utf-8"):
749 super(Operations, self).__init__()
751 self.inodes = Inodes()
754 self.encoding = encoding
756 # dict of inode to filehandle
757 self._filehandles = {}
758 self._filehandles_counter = 1
760 # Other threads that need to wait until the fuse driver
761 # is fully initialized should wait() on this event object.
762 self.initlock = threading.Event()
765 # Allow threads that are waiting for the driver to be finished
766 # initializing to continue
769 def access(self, inode, mode, ctx):
772 def getattr(self, inode):
773 if inode not in self.inodes:
774 raise llfuse.FUSEError(errno.ENOENT)
776 e = self.inodes[inode]
778 entry = llfuse.EntryAttributes()
781 entry.entry_timeout = 300
782 entry.attr_timeout = 300
784 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
785 if isinstance(e, Directory):
786 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
787 elif isinstance(e, StreamReaderFile):
788 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
790 entry.st_mode |= stat.S_IFREG
793 entry.st_uid = self.uid
794 entry.st_gid = self.gid
797 entry.st_size = e.size()
799 entry.st_blksize = 512
800 entry.st_blocks = (e.size()/512)+1
801 entry.st_atime = int(e.atime())
802 entry.st_mtime = int(e.mtime())
803 entry.st_ctime = int(e.mtime())
807 def lookup(self, parent_inode, name):
808 name = unicode(name, self.encoding)
809 _logger.debug("arv-mount lookup: parent_inode %i name %s",
816 if parent_inode in self.inodes:
817 p = self.inodes[parent_inode]
819 inode = p.parent_inode
820 elif isinstance(p, Directory) and name in p:
821 inode = p[name].inode
824 return self.getattr(inode)
826 raise llfuse.FUSEError(errno.ENOENT)
828 def open(self, inode, flags):
829 if inode in self.inodes:
830 p = self.inodes[inode]
832 raise llfuse.FUSEError(errno.ENOENT)
834 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
835 raise llfuse.FUSEError(errno.EROFS)
837 if isinstance(p, Directory):
838 raise llfuse.FUSEError(errno.EISDIR)
840 fh = self._filehandles_counter
841 self._filehandles_counter += 1
842 self._filehandles[fh] = FileHandle(fh, p)
845 def read(self, fh, off, size):
846 _logger.debug("arv-mount read %i %i %i", fh, off, size)
847 if fh in self._filehandles:
848 handle = self._filehandles[fh]
850 raise llfuse.FUSEError(errno.EBADF)
853 handle.entry._atime = time.time()
856 with llfuse.lock_released:
857 return handle.entry.readfrom(off, size)
858 except arvados.errors.NotFoundError as e:
859 _logger.warning("Block not found: " + str(e))
860 raise llfuse.FUSEError(errno.EIO)
863 raise llfuse.FUSEError(errno.EIO)
865 def release(self, fh):
866 if fh in self._filehandles:
867 del self._filehandles[fh]
869 def opendir(self, inode):
870 _logger.debug("arv-mount opendir: inode %i", inode)
872 if inode in self.inodes:
873 p = self.inodes[inode]
875 raise llfuse.FUSEError(errno.ENOENT)
877 if not isinstance(p, Directory):
878 raise llfuse.FUSEError(errno.ENOTDIR)
880 fh = self._filehandles_counter
881 self._filehandles_counter += 1
882 if p.parent_inode in self.inodes:
883 parent = self.inodes[p.parent_inode]
885 raise llfuse.FUSEError(errno.EIO)
888 p._atime = time.time()
890 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
893 def readdir(self, fh, off):
894 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
896 if fh in self._filehandles:
897 handle = self._filehandles[fh]
899 raise llfuse.FUSEError(errno.EBADF)
901 _logger.debug("arv-mount handle.entry %s", handle.entry)
904 while e < len(handle.entry):
905 if handle.entry[e][1].inode in self.inodes:
907 yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
908 except UnicodeEncodeError:
912 def releasedir(self, fh):
913 del self._filehandles[fh]
916 st = llfuse.StatvfsData()
917 st.f_bsize = 64 * 1024
930 # The llfuse documentation recommends only overloading functions that
931 # are actually implemented, as the default implementation will raise ENOSYS.
932 # However, there is a bug in the llfuse default implementation of create()
933 # "create() takes exactly 5 positional arguments (6 given)" which will crash
935 # The workaround is to implement it with the proper number of parameters,
936 # and then everything works out.
937 def create(self, p1, p2, p3, p4, p5):
938 raise llfuse.FUSEError(errno.EROFS)