2 # FUSE driver for Arvados Keep
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
26 _logger = logging.getLogger('arvados.arvados_fuse')
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
33 class SafeApi(object):
34 '''Threadsafe wrapper for API object. This stores and returns a different api
35 object per thread, because httplib2 which underlies apiclient is not
39 def __init__(self, config):
40 self.host = config.get('ARVADOS_API_HOST')
41 self.api_token = config.get('ARVADOS_API_TOKEN')
42 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
43 self.local = threading.local()
44 self.block_cache = arvados.KeepBlockCache()
47 if 'api' not in self.local.__dict__:
48 self.local.api = arvados.api('v1', False, self.host,
49 self.api_token, self.insecure)
53 if 'keep' not in self.local.__dict__:
54 self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
55 return self.local.keep
57 def __getattr__(self, name):
58 # Proxy nonexistent attributes to the local API client.
60 return getattr(self.localapi(), name)
61 except AttributeError:
62 return super(SafeApi, self).__getattr__(name)
66 '''Parse Arvados timestamp to unix time.'''
68 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
69 except (TypeError, ValueError):
72 def sanitize_filename(dirty):
73 '''Replace disallowed filename characters with harmless "_".'''
83 return _disallowed_filename_characters.sub('_', dirty)
86 class FreshBase(object):
87 '''Base class for maintaining fresh/stale state to determine when to update.'''
91 self._last_update = time.time()
92 self._atime = time.time()
95 # Mark the value as stale
99 # Test if the entries dict is stale.
104 return (self._last_update + self._poll_time) < self._atime
109 self._last_update = time.time()
114 class File(FreshBase):
115 '''Base for file objects.'''
117 def __init__(self, parent_inode, _mtime=0):
118 super(File, self).__init__()
120 self.parent_inode = parent_inode
126 def readfrom(self, off, size):
133 class StreamReaderFile(File):
134 '''Wraps a StreamFileReader as a file.'''
136 def __init__(self, parent_inode, reader, _mtime):
137 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
141 return self.reader.size()
143 def readfrom(self, off, size):
144 return self.reader.readfrom(off, size)
150 class StringFile(File):
151 '''Wrap a simple string as a file'''
152 def __init__(self, parent_inode, contents, _mtime):
153 super(StringFile, self).__init__(parent_inode, _mtime)
154 self.contents = contents
157 return len(self.contents)
159 def readfrom(self, off, size):
160 return self.contents[off:(off+size)]
163 class ObjectFile(StringFile):
164 '''Wrap a dict as a serialized json object.'''
166 def __init__(self, parent_inode, obj):
167 super(ObjectFile, self).__init__(parent_inode, "", 0)
168 self.uuid = obj['uuid']
171 def update(self, obj):
172 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
173 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
176 class Directory(FreshBase):
177 '''Generic directory object, backed by a dict.
178 Consists of a set of entries with the key representing the filename
179 and the value referencing a File or Directory object.
182 def __init__(self, parent_inode):
183 super(Directory, self).__init__()
185 '''parent_inode is the integer inode number'''
187 if not isinstance(parent_inode, int):
188 raise Exception("parent_inode should be an int")
189 self.parent_inode = parent_inode
191 self._mtime = time.time()
193 # Overriden by subclasses to implement logic to update the entries dict
194 # when the directory is stale
198 # Only used when computing the size of the disk footprint of the directory
203 def checkupdate(self):
207 except apiclient.errors.HttpError as e:
210 def __getitem__(self, item):
212 return self._entries[item]
216 return self._entries.items()
220 return self._entries.iterkeys()
222 def __contains__(self, k):
224 return k in self._entries
226 def merge(self, items, fn, same, new_entry):
227 '''Helper method for updating the contents of the directory. Takes a list
228 describing the new contents of the directory, reuse entries that are
229 the same in both the old and new lists, create new entries, and delete
230 old entries missing from the new list.
232 items: iterable with new directory contents
234 fn: function to take an entry in 'items' and return the desired file or
235 directory name, or None if this entry should be skipped
237 same: function to compare an existing entry (a File or Directory
238 object) with an entry in the items list to determine whether to keep
241 new_entry: function to create a new directory entry (File or Directory
242 object) from an entry in the items list.
246 oldentries = self._entries
250 name = sanitize_filename(fn(i))
252 if name in oldentries and same(oldentries[name], i):
253 # move existing directory entry over
254 self._entries[name] = oldentries[name]
257 # create new directory entry
260 self._entries[name] = self.inodes.add_entry(ent)
263 # delete any other directory entries that were not in found in 'items'
265 llfuse.invalidate_entry(self.inode, str(i))
266 self.inodes.del_entry(oldentries[i])
270 self._mtime = time.time()
275 '''Delete all entries'''
276 oldentries = self._entries
279 if isinstance(n, Directory):
281 llfuse.invalidate_entry(self.inode, str(n))
282 self.inodes.del_entry(oldentries[n])
283 llfuse.invalidate_inode(self.inode)
290 class CollectionDirectory(Directory):
291 '''Represents the root of a directory tree holding a collection.'''
293 def __init__(self, parent_inode, inodes, api, num_retries, collection):
294 super(CollectionDirectory, self).__init__(parent_inode)
297 self.num_retries = num_retries
298 self.collection_object_file = None
299 self.collection_object = None
300 if isinstance(collection, dict):
301 self.collection_locator = collection['uuid']
303 self.collection_locator = collection
306 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
308 # Used by arv-web.py to switch the contents of the CollectionDirectory
309 def change_collection(self, new_locator):
310 """Switch the contents of the CollectionDirectory. Must be called with llfuse.lock held."""
311 self.collection_locator = new_locator
312 self.collection_object = None
315 def new_collection(self, new_collection_object, coll_reader):
316 self.collection_object = new_collection_object
318 if self.collection_object_file is not None:
319 self.collection_object_file.update(self.collection_object)
322 for s in coll_reader.all_streams():
324 for part in s.name().split('/'):
325 if part != '' and part != '.':
326 partname = sanitize_filename(part)
327 if partname not in cwd._entries:
328 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
329 cwd = cwd._entries[partname]
330 for k, v in s.files().items():
331 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
335 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
338 if self.collection_locator is None:
342 with llfuse.lock_released:
343 coll_reader = arvados.CollectionReader(
344 self.collection_locator, self.api, self.api.localkeep(),
345 num_retries=self.num_retries)
346 new_collection_object = coll_reader.api_response() or {}
347 # If the Collection only exists in Keep, there will be no API
348 # response. Fill in the fields we need.
349 if 'uuid' not in new_collection_object:
350 new_collection_object['uuid'] = self.collection_locator
351 if "portable_data_hash" not in new_collection_object:
352 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
353 if 'manifest_text' not in new_collection_object:
354 new_collection_object['manifest_text'] = coll_reader.manifest_text()
355 coll_reader.normalize()
356 # end with llfuse.lock_released, re-acquire lock
358 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
359 self.new_collection(new_collection_object, coll_reader)
363 except arvados.errors.NotFoundError:
364 _logger.exception("arv-mount %s: error", self.collection_locator)
365 except arvados.errors.ArgumentError as detail:
366 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
367 if self.collection_object is not None and "manifest_text" in self.collection_object:
368 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
370 _logger.exception("arv-mount %s: error", self.collection_locator)
371 if self.collection_object is not None and "manifest_text" in self.collection_object:
372 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
375 def __getitem__(self, item):
377 if item == '.arvados#collection':
378 if self.collection_object_file is None:
379 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
380 self.inodes.add_entry(self.collection_object_file)
381 return self.collection_object_file
383 return super(CollectionDirectory, self).__getitem__(item)
385 def __contains__(self, k):
386 if k == '.arvados#collection':
389 return super(CollectionDirectory, self).__contains__(k)
393 return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
396 class MagicDirectory(Directory):
397 '''A special directory that logically contains the set of all extant keep
398 locators. When a file is referenced by lookup(), it is tested to see if it
399 is a valid keep locator to a manifest, and if so, loads the manifest
400 contents as a subdirectory of this directory with the locator as the
401 directory name. Since querying a list of all extant keep locators is
402 impractical, only collections that have already been accessed are visible
407 This directory provides access to Arvados collections as subdirectories listed
408 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
409 the form '1234567890abcdefghijklmnopqrstuv+123').
411 Note that this directory will appear empty until you attempt to access a
412 specific collection subdirectory (such as trying to 'cd' into it), at which
413 point the collection will actually be looked up on the server and the directory
414 will appear if it exists.
417 def __init__(self, parent_inode, inodes, api, num_retries):
418 super(MagicDirectory, self).__init__(parent_inode)
421 self.num_retries = num_retries
423 def __setattr__(self, name, value):
424 super(MagicDirectory, self).__setattr__(name, value)
425 # When we're assigned an inode, add a README.
426 if ((name == 'inode') and (self.inode is not None) and
427 (not self._entries)):
428 self._entries['README'] = self.inodes.add_entry(
429 StringFile(self.inode, self.README_TEXT, time.time()))
430 # If we're the root directory, add an identical by_id subdirectory.
431 if self.inode == llfuse.ROOT_INODE:
432 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
433 self.inode, self.inodes, self.api, self.num_retries))
435 def __contains__(self, k):
436 if k in self._entries:
439 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
443 e = self.inodes.add_entry(CollectionDirectory(
444 self.inode, self.inodes, self.api, self.num_retries, k))
450 except Exception as e:
451 _logger.debug('arv-mount exception keep %s', e)
454 def __getitem__(self, item):
456 return self._entries[item]
458 raise KeyError("No collection with id " + item)
461 class RecursiveInvalidateDirectory(Directory):
462 def invalidate(self):
463 if self.inode == llfuse.ROOT_INODE:
464 llfuse.lock.acquire()
466 super(RecursiveInvalidateDirectory, self).invalidate()
467 for a in self._entries:
468 self._entries[a].invalidate()
472 if self.inode == llfuse.ROOT_INODE:
473 llfuse.lock.release()
476 class TagsDirectory(RecursiveInvalidateDirectory):
477 '''A special directory that contains as subdirectories all tags visible to the user.'''
479 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
480 super(TagsDirectory, self).__init__(parent_inode)
483 self.num_retries = num_retries
485 self._poll_time = poll_time
488 with llfuse.lock_released:
489 tags = self.api.links().list(
490 filters=[['link_class', '=', 'tag']],
491 select=['name'], distinct=True
492 ).execute(num_retries=self.num_retries)
494 self.merge(tags['items'],
496 lambda a, i: a.tag == i['name'],
497 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
500 class TagDirectory(Directory):
501 '''A special directory that contains as subdirectories all collections visible
502 to the user that are tagged with a particular tag.
505 def __init__(self, parent_inode, inodes, api, num_retries, tag,
506 poll=False, poll_time=60):
507 super(TagDirectory, self).__init__(parent_inode)
510 self.num_retries = num_retries
513 self._poll_time = poll_time
516 with llfuse.lock_released:
517 taggedcollections = self.api.links().list(
518 filters=[['link_class', '=', 'tag'],
519 ['name', '=', self.tag],
520 ['head_uuid', 'is_a', 'arvados#collection']],
522 ).execute(num_retries=self.num_retries)
523 self.merge(taggedcollections['items'],
524 lambda i: i['head_uuid'],
525 lambda a, i: a.collection_locator == i['head_uuid'],
526 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
529 class ProjectDirectory(Directory):
530 '''A special directory that contains the contents of a project.'''
532 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
533 poll=False, poll_time=60):
534 super(ProjectDirectory, self).__init__(parent_inode)
537 self.num_retries = num_retries
538 self.project_object = project_object
539 self.project_object_file = None
540 self.uuid = project_object['uuid']
542 self._poll_time = poll_time
544 def createDirectory(self, i):
545 if collection_uuid_pattern.match(i['uuid']):
546 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
547 elif group_uuid_pattern.match(i['uuid']):
548 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
549 elif link_uuid_pattern.match(i['uuid']):
550 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
551 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
554 elif uuid_pattern.match(i['uuid']):
555 return ObjectFile(self.parent_inode, i)
560 if self.project_object_file == None:
561 self.project_object_file = ObjectFile(self.inode, self.project_object)
562 self.inodes.add_entry(self.project_object_file)
566 if i['name'] is None or len(i['name']) == 0:
568 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
569 # collection or subproject
571 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
574 elif 'kind' in i and i['kind'].startswith('arvados#'):
576 return "{}.{}".format(i['name'], i['kind'][8:])
581 if isinstance(a, CollectionDirectory):
582 return a.collection_locator == i['uuid']
583 elif isinstance(a, ProjectDirectory):
584 return a.uuid == i['uuid']
585 elif isinstance(a, ObjectFile):
586 return a.uuid == i['uuid'] and not a.stale()
589 with llfuse.lock_released:
590 if group_uuid_pattern.match(self.uuid):
591 self.project_object = self.api.groups().get(
592 uuid=self.uuid).execute(num_retries=self.num_retries)
593 elif user_uuid_pattern.match(self.uuid):
594 self.project_object = self.api.users().get(
595 uuid=self.uuid).execute(num_retries=self.num_retries)
597 contents = arvados.util.list_all(self.api.groups().contents,
598 self.num_retries, uuid=self.uuid)
599 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
600 contents += arvados.util.list_all(
601 self.api.links().list, self.num_retries,
602 filters=[['tail_uuid', '=', self.uuid],
603 ['link_class', '=', 'name']])
605 # end with llfuse.lock_released, re-acquire lock
610 self.createDirectory)
612 def __getitem__(self, item):
614 if item == '.arvados#project':
615 return self.project_object_file
617 return super(ProjectDirectory, self).__getitem__(item)
619 def __contains__(self, k):
620 if k == '.arvados#project':
623 return super(ProjectDirectory, self).__contains__(k)
626 class SharedDirectory(Directory):
627 '''A special directory that represents users or groups who have shared projects with me.'''
629 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
630 poll=False, poll_time=60):
631 super(SharedDirectory, self).__init__(parent_inode)
634 self.num_retries = num_retries
635 self.current_user = api.users().current().execute(num_retries=num_retries)
637 self._poll_time = poll_time
640 with llfuse.lock_released:
641 all_projects = arvados.util.list_all(
642 self.api.groups().list, self.num_retries,
643 filters=[['group_class','=','project']])
645 for ob in all_projects:
646 objects[ob['uuid']] = ob
650 for ob in all_projects:
651 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
653 root_owners[ob['owner_uuid']] = True
655 lusers = arvados.util.list_all(
656 self.api.users().list, self.num_retries,
657 filters=[['uuid','in', list(root_owners)]])
658 lgroups = arvados.util.list_all(
659 self.api.groups().list, self.num_retries,
660 filters=[['uuid','in', list(root_owners)]])
666 objects[l["uuid"]] = l
668 objects[l["uuid"]] = l
671 for r in root_owners:
675 contents[obr["name"]] = obr
676 if "first_name" in obr:
677 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
680 if r['owner_uuid'] not in objects:
681 contents[r['name']] = r
683 # end with llfuse.lock_released, re-acquire lock
686 self.merge(contents.items(),
688 lambda a, i: a.uuid == i[1]['uuid'],
689 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
694 class FileHandle(object):
695 '''Connects a numeric file handle to a File or Directory object that has
696 been opened by the client.'''
698 def __init__(self, fh, entry):
703 class Inodes(object):
704 '''Manage the set of inodes. This is the mapping from a numeric id
705 to a concrete File or Directory object'''
709 self._counter = itertools.count(llfuse.ROOT_INODE)
711 def __getitem__(self, item):
712 return self._entries[item]
714 def __setitem__(self, key, item):
715 self._entries[key] = item
718 return self._entries.iterkeys()
721 return self._entries.items()
723 def __contains__(self, k):
724 return k in self._entries
726 def add_entry(self, entry):
727 entry.inode = next(self._counter)
728 self._entries[entry.inode] = entry
731 def del_entry(self, entry):
732 llfuse.invalidate_inode(entry.inode)
733 del self._entries[entry.inode]
735 class Operations(llfuse.Operations):
736 '''This is the main interface with llfuse. The methods on this object are
737 called by llfuse threads to service FUSE events to query and read from
740 llfuse has its own global lock which is acquired before calling a request handler,
741 so request handlers do not run concurrently unless the lock is explicitly released
742 using "with llfuse.lock_released:"'''
744 def __init__(self, uid, gid, encoding="utf-8"):
745 super(Operations, self).__init__()
747 self.inodes = Inodes()
750 self.encoding = encoding
752 # dict of inode to filehandle
753 self._filehandles = {}
754 self._filehandles_counter = 1
756 # Other threads that need to wait until the fuse driver
757 # is fully initialized should wait() on this event object.
758 self.initlock = threading.Event()
761 # Allow threads that are waiting for the driver to be finished
762 # initializing to continue
765 def access(self, inode, mode, ctx):
768 def getattr(self, inode):
769 if inode not in self.inodes:
770 raise llfuse.FUSEError(errno.ENOENT)
772 e = self.inodes[inode]
774 entry = llfuse.EntryAttributes()
777 entry.entry_timeout = 300
778 entry.attr_timeout = 300
780 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
781 if isinstance(e, Directory):
782 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
783 elif isinstance(e, StreamReaderFile):
784 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
786 entry.st_mode |= stat.S_IFREG
789 entry.st_uid = self.uid
790 entry.st_gid = self.gid
793 entry.st_size = e.size()
795 entry.st_blksize = 512
796 entry.st_blocks = (e.size()/512)+1
797 entry.st_atime = int(e.atime())
798 entry.st_mtime = int(e.mtime())
799 entry.st_ctime = int(e.mtime())
803 def lookup(self, parent_inode, name):
804 name = unicode(name, self.encoding)
805 _logger.debug("arv-mount lookup: parent_inode %i name %s",
812 if parent_inode in self.inodes:
813 p = self.inodes[parent_inode]
815 inode = p.parent_inode
816 elif isinstance(p, Directory) and name in p:
817 inode = p[name].inode
820 return self.getattr(inode)
822 raise llfuse.FUSEError(errno.ENOENT)
824 def open(self, inode, flags):
825 if inode in self.inodes:
826 p = self.inodes[inode]
828 raise llfuse.FUSEError(errno.ENOENT)
830 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
831 raise llfuse.FUSEError(errno.EROFS)
833 if isinstance(p, Directory):
834 raise llfuse.FUSEError(errno.EISDIR)
836 fh = self._filehandles_counter
837 self._filehandles_counter += 1
838 self._filehandles[fh] = FileHandle(fh, p)
841 def read(self, fh, off, size):
842 _logger.debug("arv-mount read %i %i %i", fh, off, size)
843 if fh in self._filehandles:
844 handle = self._filehandles[fh]
846 raise llfuse.FUSEError(errno.EBADF)
849 handle.entry._atime = time.time()
852 with llfuse.lock_released:
853 return handle.entry.readfrom(off, size)
854 except arvados.errors.NotFoundError as e:
855 _logger.warning("Block not found: " + str(e))
856 raise llfuse.FUSEError(errno.EIO)
859 raise llfuse.FUSEError(errno.EIO)
861 def release(self, fh):
862 if fh in self._filehandles:
863 del self._filehandles[fh]
865 def opendir(self, inode):
866 _logger.debug("arv-mount opendir: inode %i", inode)
868 if inode in self.inodes:
869 p = self.inodes[inode]
871 raise llfuse.FUSEError(errno.ENOENT)
873 if not isinstance(p, Directory):
874 raise llfuse.FUSEError(errno.ENOTDIR)
876 fh = self._filehandles_counter
877 self._filehandles_counter += 1
878 if p.parent_inode in self.inodes:
879 parent = self.inodes[p.parent_inode]
881 raise llfuse.FUSEError(errno.EIO)
884 p._atime = time.time()
886 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
889 def readdir(self, fh, off):
890 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
892 if fh in self._filehandles:
893 handle = self._filehandles[fh]
895 raise llfuse.FUSEError(errno.EBADF)
897 _logger.debug("arv-mount handle.entry %s", handle.entry)
900 while e < len(handle.entry):
901 if handle.entry[e][1].inode in self.inodes:
903 yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
904 except UnicodeEncodeError:
908 def releasedir(self, fh):
909 del self._filehandles[fh]
912 st = llfuse.StatvfsData()
913 st.f_bsize = 64 * 1024
926 # The llfuse documentation recommends only overloading functions that
927 # are actually implemented, as the default implementation will raise ENOSYS.
928 # However, there is a bug in the llfuse default implementation of create()
929 # "create() takes exactly 5 positional arguments (6 given)" which will crash
931 # The workaround is to implement it with the proper number of parameters,
932 # and then everything works out.
933 def create(self, p1, p2, p3, p4, p5):
934 raise llfuse.FUSEError(errno.EROFS)