2 # FUSE driver for Arvados Keep
8 from llfuse import FUSEError
22 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
24 _logger = logging.getLogger('arvados.arvados_fuse')
26 class SafeApi(object):
27 '''Threadsafe wrapper for API object. This stores and returns a different api
28 object per thread, because httplib2 which underlies apiclient is not
32 def __init__(self, config):
33 self.host = config.get('ARVADOS_API_HOST')
34 self.api_token = config.get('ARVADOS_API_TOKEN')
35 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
36 self.local = threading.local()
37 self.block_cache = arvados.KeepBlockCache()
40 if 'api' not in self.local.__dict__:
41 self.local.api = arvados.api('v1', False, self.host,
42 self.api_token, self.insecure)
46 if 'keep' not in self.local.__dict__:
47 self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
48 return self.local.keep
50 def __getattr__(self, name):
51 # Proxy nonexistent attributes to the local API client.
53 return getattr(self.localapi(), name)
54 except AttributeError:
55 return super(SafeApi, self).__getattr__(name)
59 '''Parse Arvados timestamp to unix time.'''
61 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
62 except (TypeError, ValueError):
65 def sanitize_filename(dirty):
66 '''Remove troublesome characters from filenames.'''
67 # http://www.dwheeler.com/essays/fixing-unix-linux-filenames.html
73 if (c >= '\x00' and c <= '\x1f') or c == '\x7f' or c == '/':
74 # skip control characters and /
78 # strip leading - or ~ and leading/trailing whitespace
79 stripped = fn.lstrip("-~ ").rstrip()
86 class FreshBase(object):
87 '''Base class for maintaining fresh/stale state to determine when to update.'''
91 self._last_update = time.time()
92 self._atime = time.time()
95 # Mark the value as stale
99 # Test if the entries dict is stale.
104 return (self._last_update + self._poll_time) < self._atime
109 self._last_update = time.time()
114 class File(FreshBase):
115 '''Base for file objects.'''
117 def __init__(self, parent_inode, _mtime=0):
118 super(File, self).__init__()
120 self.parent_inode = parent_inode
126 def readfrom(self, off, size):
133 class StreamReaderFile(File):
134 '''Wraps a StreamFileReader as a file.'''
136 def __init__(self, parent_inode, reader, _mtime):
137 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
141 return self.reader.size()
143 def readfrom(self, off, size):
144 return self.reader.readfrom(off, size)
150 class StringFile(File):
151 '''Wrap a simple string as a file'''
152 def __init__(self, parent_inode, contents, _mtime):
153 super(StringFile, self).__init__(parent_inode, _mtime)
154 self.contents = contents
157 return len(self.contents)
159 def readfrom(self, off, size):
160 return self.contents[off:(off+size)]
163 class ObjectFile(StringFile):
164 '''Wrap a dict as a serialized json object.'''
166 def __init__(self, parent_inode, obj):
167 super(ObjectFile, self).__init__(parent_inode, "", 0)
168 self.uuid = obj['uuid']
171 def update(self, obj):
172 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
173 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
176 class Directory(FreshBase):
177 '''Generic directory object, backed by a dict.
178 Consists of a set of entries with the key representing the filename
179 and the value referencing a File or Directory object.
182 def __init__(self, parent_inode):
183 super(Directory, self).__init__()
185 '''parent_inode is the integer inode number'''
187 if not isinstance(parent_inode, int):
188 raise Exception("parent_inode should be an int")
189 self.parent_inode = parent_inode
191 self._mtime = time.time()
193 # Overriden by subclasses to implement logic to update the entries dict
194 # when the directory is stale
198 # Only used when computing the size of the disk footprint of the directory
203 def checkupdate(self):
207 except apiclient.errors.HttpError as e:
210 def __getitem__(self, item):
212 return self._entries[item]
216 return self._entries.items()
220 return self._entries.iterkeys()
222 def __contains__(self, k):
224 return k in self._entries
226 def merge(self, items, fn, same, new_entry):
227 '''Helper method for updating the contents of the directory. Takes a list
228 describing the new contents of the directory, reuse entries that are
229 the same in both the old and new lists, create new entries, and delete
230 old entries missing from the new list.
232 items: iterable with new directory contents
234 fn: function to take an entry in 'items' and return the desired file or
235 directory name, or None if this entry should be skipped
237 same: function to compare an existing entry (a File or Directory
238 object) with an entry in the items list to determine whether to keep
241 new_entry: function to create a new directory entry (File or Directory
242 object) from an entry in the items list.
246 oldentries = self._entries
250 name = sanitize_filename(fn(i))
252 if name in oldentries and same(oldentries[name], i):
253 # move existing directory entry over
254 self._entries[name] = oldentries[name]
257 # create new directory entry
260 self._entries[name] = self.inodes.add_entry(ent)
263 # delete any other directory entries that were not in found in 'items'
265 llfuse.invalidate_entry(self.inode, str(i))
266 self.inodes.del_entry(oldentries[i])
270 self._mtime = time.time()
275 '''Delete all entries'''
276 oldentries = self._entries
279 if isinstance(n, Directory):
281 llfuse.invalidate_entry(self.inode, str(n))
282 self.inodes.del_entry(oldentries[n])
289 class CollectionDirectory(Directory):
290 '''Represents the root of a directory tree holding a collection.'''
292 def __init__(self, parent_inode, inodes, api, collection):
293 super(CollectionDirectory, self).__init__(parent_inode)
296 self.collection_object_file = None
297 self.collection_object = None
298 if isinstance(collection, dict):
299 self.collection_locator = collection['uuid']
301 self.collection_locator = collection
304 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
306 def new_collection(self, new_collection_object):
307 self.collection_object = new_collection_object
309 if self.collection_object_file is not None:
310 self.collection_object_file.update(self.collection_object)
313 collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api, self.api.localkeep())
314 for s in collection.all_streams():
316 for part in s.name().split('/'):
317 if part != '' and part != '.':
318 partname = sanitize_filename(part)
319 if partname not in cwd._entries:
320 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
321 cwd = cwd._entries[partname]
322 for k, v in s.files().items():
323 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
327 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
330 with llfuse.lock_released:
331 new_collection_object = self.api.collections().get(uuid=self.collection_locator).execute()
332 if "portable_data_hash" not in new_collection_object:
333 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
334 # end with llfuse.lock_released, re-acquire lock
336 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
337 self.new_collection(new_collection_object)
341 except apiclient.errors.HttpError as e:
342 if e.resp.status == 404:
343 _logger.warn("arv-mount %s: not found", self.collection_locator)
345 _logger.error("arv-mount %s: error", self.collection_locator)
346 _logger.exception(detail)
347 except arvados.errors.ArgumentError as detail:
348 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
349 if self.collection_object is not None and "manifest_text" in self.collection_object:
350 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
351 except Exception as detail:
352 _logger.error("arv-mount %s: error", self.collection_locator)
353 if self.collection_object is not None and "manifest_text" in self.collection_object:
354 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
355 _logger.exception(detail)
358 def __getitem__(self, item):
360 if item == '.arvados#collection':
361 if self.collection_object_file is None:
362 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
363 self.inodes.add_entry(self.collection_object_file)
364 return self.collection_object_file
366 return super(CollectionDirectory, self).__getitem__(item)
368 def __contains__(self, k):
369 if k == '.arvados#collection':
372 return super(CollectionDirectory, self).__contains__(k)
376 return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
379 class MagicDirectory(Directory):
380 '''A special directory that logically contains the set of all extant keep
381 locators. When a file is referenced by lookup(), it is tested to see if it
382 is a valid keep locator to a manifest, and if so, loads the manifest
383 contents as a subdirectory of this directory with the locator as the
384 directory name. Since querying a list of all extant keep locators is
385 impractical, only collections that have already been accessed are visible
389 def __init__(self, parent_inode, inodes, api):
390 super(MagicDirectory, self).__init__(parent_inode)
393 # Have to defer creating readme_file because at this point we don't
394 # yet have an inode assigned.
395 self.readme_file = None
397 def create_readme(self):
398 if self.readme_file is None:
399 text = '''This directory provides access to Arvados collections as subdirectories listed
400 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
401 the form '1234567890abcdefghijklmnopqrstuv+123').
403 Note that this directory will appear empty until you attempt to access a
404 specific collection subdirectory (such as trying to 'cd' into it), at which
405 point the collection will actually be looked up on the server and the directory
406 will appear if it exists.
408 self.readme_file = self.inodes.add_entry(StringFile(self.inode, text, time.time()))
409 self._entries["README"] = self.readme_file
411 def __contains__(self, k):
414 if k in self._entries:
417 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
421 e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, self.api, k))
427 except Exception as e:
428 _logger.debug('arv-mount exception keep %s', e)
433 return self._entries.items()
435 def __getitem__(self, item):
437 return self._entries[item]
439 raise KeyError("No collection with id " + item)
442 class RecursiveInvalidateDirectory(Directory):
443 def invalidate(self):
444 if self.inode == llfuse.ROOT_INODE:
445 llfuse.lock.acquire()
447 super(RecursiveInvalidateDirectory, self).invalidate()
448 for a in self._entries:
449 self._entries[a].invalidate()
450 except Exception as e:
453 if self.inode == llfuse.ROOT_INODE:
454 llfuse.lock.release()
457 class TagsDirectory(RecursiveInvalidateDirectory):
458 '''A special directory that contains as subdirectories all tags visible to the user.'''
460 def __init__(self, parent_inode, inodes, api, poll_time=60):
461 super(TagsDirectory, self).__init__(parent_inode)
465 self._poll_time = poll_time
468 with llfuse.lock_released:
469 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
471 self.merge(tags['items'],
472 lambda i: i['name'] if 'name' in i else i['uuid'],
473 lambda a, i: a.tag == i,
474 lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
477 class TagDirectory(Directory):
478 '''A special directory that contains as subdirectories all collections visible
479 to the user that are tagged with a particular tag.
482 def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
483 super(TagDirectory, self).__init__(parent_inode)
488 self._poll_time = poll_time
491 with llfuse.lock_released:
492 taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
493 ['name', '=', self.tag],
494 ['head_uuid', 'is_a', 'arvados#collection']],
495 select=['head_uuid']).execute()
496 self.merge(taggedcollections['items'],
497 lambda i: i['head_uuid'],
498 lambda a, i: a.collection_locator == i['head_uuid'],
499 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid']))
502 class ProjectDirectory(Directory):
503 '''A special directory that contains the contents of a project.'''
505 def __init__(self, parent_inode, inodes, api, project_object, poll=False, poll_time=60):
506 super(ProjectDirectory, self).__init__(parent_inode)
509 self.project_object = project_object
510 self.project_object_file = None
511 self.uuid = project_object['uuid']
513 def createDirectory(self, i):
514 if collection_uuid_pattern.match(i['uuid']):
515 return CollectionDirectory(self.inode, self.inodes, self.api, i)
516 elif group_uuid_pattern.match(i['uuid']):
517 return ProjectDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
518 elif link_uuid_pattern.match(i['uuid']):
519 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
520 return CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid'])
523 elif uuid_pattern.match(i['uuid']):
524 return ObjectFile(self.parent_inode, i)
529 if self.project_object_file == None:
530 self.project_object_file = ObjectFile(self.inode, self.project_object)
531 self.inodes.add_entry(self.project_object_file)
535 if i['name'] is None or len(i['name']) == 0:
537 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
538 # collection or subproject
540 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
543 elif 'kind' in i and i['kind'].startswith('arvados#'):
545 return "{}.{}".format(i['name'], i['kind'][8:])
550 if isinstance(a, CollectionDirectory):
551 return a.collection_locator == i['uuid']
552 elif isinstance(a, ProjectDirectory):
553 return a.uuid == i['uuid']
554 elif isinstance(a, ObjectFile):
555 return a.uuid == i['uuid'] and not a.stale()
558 with llfuse.lock_released:
559 if group_uuid_pattern.match(self.uuid):
560 self.project_object = self.api.groups().get(uuid=self.uuid).execute()
561 elif user_uuid_pattern.match(self.uuid):
562 self.project_object = self.api.users().get(uuid=self.uuid).execute()
564 contents = arvados.util.list_all(self.api.groups().contents, uuid=self.uuid)
565 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
566 contents += arvados.util.list_all(self.api.links().list, filters=[['tail_uuid', '=', self.uuid], ['link_class', '=', 'name']])
568 # end with llfuse.lock_released, re-acquire lock
573 self.createDirectory)
575 def __getitem__(self, item):
577 if item == '.arvados#project':
578 return self.project_object_file
580 return super(ProjectDirectory, self).__getitem__(item)
582 def __contains__(self, k):
583 if k == '.arvados#project':
586 return super(ProjectDirectory, self).__contains__(k)
589 class SharedDirectory(Directory):
590 '''A special directory that represents users or groups who have shared projects with me.'''
592 def __init__(self, parent_inode, inodes, api, exclude, poll=False, poll_time=60):
593 super(SharedDirectory, self).__init__(parent_inode)
594 self.current_user = api.users().current().execute()
598 self._poll_time = poll_time
601 with llfuse.lock_released:
602 all_projects = arvados.util.list_all(self.api.groups().list, filters=[['group_class','=','project']])
604 for ob in all_projects:
605 objects[ob['uuid']] = ob
609 for ob in all_projects:
610 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
612 root_owners[ob['owner_uuid']] = True
614 lusers = arvados.util.list_all(self.api.users().list, filters=[['uuid','in', list(root_owners)]])
615 lgroups = arvados.util.list_all(self.api.groups().list, filters=[['uuid','in', list(root_owners)]])
621 objects[l["uuid"]] = l
623 objects[l["uuid"]] = l
626 for r in root_owners:
630 contents[obr["name"]] = obr
631 if "first_name" in obr:
632 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
635 if r['owner_uuid'] not in objects:
636 contents[r['name']] = r
638 # end with llfuse.lock_released, re-acquire lock
641 self.merge(contents.items(),
643 lambda a, i: a.uuid == i[1]['uuid'],
644 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, i[1], poll=self._poll, poll_time=self._poll_time))
645 except Exception as e:
649 class FileHandle(object):
650 '''Connects a numeric file handle to a File or Directory object that has
651 been opened by the client.'''
653 def __init__(self, fh, entry):
658 class Inodes(object):
659 '''Manage the set of inodes. This is the mapping from a numeric id
660 to a concrete File or Directory object'''
664 self._counter = llfuse.ROOT_INODE
666 def __getitem__(self, item):
667 return self._entries[item]
669 def __setitem__(self, key, item):
670 self._entries[key] = item
673 return self._entries.iterkeys()
676 return self._entries.items()
678 def __contains__(self, k):
679 return k in self._entries
681 def add_entry(self, entry):
682 entry.inode = self._counter
683 self._entries[entry.inode] = entry
687 def del_entry(self, entry):
688 llfuse.invalidate_inode(entry.inode)
689 del self._entries[entry.inode]
691 class Operations(llfuse.Operations):
692 '''This is the main interface with llfuse. The methods on this object are
693 called by llfuse threads to service FUSE events to query and read from
696 llfuse has its own global lock which is acquired before calling a request handler,
697 so request handlers do not run concurrently unless the lock is explicitly released
698 using "with llfuse.lock_released:"'''
700 def __init__(self, uid, gid):
701 super(Operations, self).__init__()
703 self.inodes = Inodes()
707 # dict of inode to filehandle
708 self._filehandles = {}
709 self._filehandles_counter = 1
711 # Other threads that need to wait until the fuse driver
712 # is fully initialized should wait() on this event object.
713 self.initlock = threading.Event()
716 # Allow threads that are waiting for the driver to be finished
717 # initializing to continue
720 def access(self, inode, mode, ctx):
723 def getattr(self, inode):
724 if inode not in self.inodes:
725 raise llfuse.FUSEError(errno.ENOENT)
727 e = self.inodes[inode]
729 entry = llfuse.EntryAttributes()
732 entry.entry_timeout = 300
733 entry.attr_timeout = 300
735 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
736 if isinstance(e, Directory):
737 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
739 entry.st_mode |= stat.S_IFREG
742 entry.st_uid = self.uid
743 entry.st_gid = self.gid
746 entry.st_size = e.size()
748 entry.st_blksize = 512
749 entry.st_blocks = (e.size()/512)+1
750 entry.st_atime = int(e.atime())
751 entry.st_mtime = int(e.mtime())
752 entry.st_ctime = int(e.mtime())
756 def lookup(self, parent_inode, name):
757 _logger.debug("arv-mount lookup: parent_inode %i name %s",
764 if parent_inode in self.inodes:
765 p = self.inodes[parent_inode]
767 inode = p.parent_inode
768 elif isinstance(p, Directory) and name in p:
769 inode = p[name].inode
772 return self.getattr(inode)
774 raise llfuse.FUSEError(errno.ENOENT)
776 def open(self, inode, flags):
777 if inode in self.inodes:
778 p = self.inodes[inode]
780 raise llfuse.FUSEError(errno.ENOENT)
782 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
783 raise llfuse.FUSEError(errno.EROFS)
785 if isinstance(p, Directory):
786 raise llfuse.FUSEError(errno.EISDIR)
788 fh = self._filehandles_counter
789 self._filehandles_counter += 1
790 self._filehandles[fh] = FileHandle(fh, p)
793 def read(self, fh, off, size):
794 _logger.debug("arv-mount read %i %i %i", fh, off, size)
795 if fh in self._filehandles:
796 handle = self._filehandles[fh]
798 raise llfuse.FUSEError(errno.EBADF)
801 handle.entry._atime = time.time()
804 with llfuse.lock_released:
805 return handle.entry.readfrom(off, size)
806 except arvados.errors.NotFoundError as e:
807 _logger.warning("Block not found: " + str(e))
808 raise llfuse.FUSEError(errno.EIO)
809 except Exception as e:
811 raise llfuse.FUSEError(errno.EIO)
813 def release(self, fh):
814 if fh in self._filehandles:
815 del self._filehandles[fh]
817 def opendir(self, inode):
818 _logger.debug("arv-mount opendir: inode %i", inode)
820 if inode in self.inodes:
821 p = self.inodes[inode]
823 raise llfuse.FUSEError(errno.ENOENT)
825 if not isinstance(p, Directory):
826 raise llfuse.FUSEError(errno.ENOTDIR)
828 fh = self._filehandles_counter
829 self._filehandles_counter += 1
830 if p.parent_inode in self.inodes:
831 parent = self.inodes[p.parent_inode]
833 raise llfuse.FUSEError(errno.EIO)
836 p._atime = time.time()
838 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
841 def readdir(self, fh, off):
842 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
844 if fh in self._filehandles:
845 handle = self._filehandles[fh]
847 raise llfuse.FUSEError(errno.EBADF)
849 _logger.debug("arv-mount handle.entry %s", handle.entry)
852 while e < len(handle.entry):
853 if handle.entry[e][1].inode in self.inodes:
854 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
857 def releasedir(self, fh):
858 del self._filehandles[fh]
861 st = llfuse.StatvfsData()
862 st.f_bsize = 64 * 1024
875 # The llfuse documentation recommends only overloading functions that
876 # are actually implemented, as the default implementation will raise ENOSYS.
877 # However, there is a bug in the llfuse default implementation of create()
878 # "create() takes exactly 5 positional arguments (6 given)" which will crash
880 # The workaround is to implement it with the proper number of parameters,
881 # and then everything works out.
882 def create(self, p1, p2, p3, p4, p5):
883 raise llfuse.FUSEError(errno.EROFS)