2 # FUSE driver for Arvados Keep
8 from llfuse import FUSEError
23 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
25 _logger = logging.getLogger('arvados.arvados_fuse')
27 class SafeApi(object):
28 '''Threadsafe wrapper for API object. This stores and returns a different api
29 object per thread, because httplib2 which underlies apiclient is not
33 def __init__(self, config):
34 self.host = config.get('ARVADOS_API_HOST')
35 self.api_token = config.get('ARVADOS_API_TOKEN')
36 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
37 self.local = threading.local()
38 self.block_cache = arvados.KeepBlockCache()
41 if 'api' not in self.local.__dict__:
42 self.local.api = arvados.api('v1', False, self.host,
43 self.api_token, self.insecure)
47 if 'keep' not in self.local.__dict__:
48 self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
49 return self.local.keep
51 def __getattr__(self, name):
52 # Proxy nonexistent attributes to the local API client.
54 return getattr(self.localapi(), name)
55 except AttributeError:
56 return super(SafeApi, self).__getattr__(name)
60 '''Parse Arvados timestamp to unix time.'''
62 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
63 except (TypeError, ValueError):
66 def sanitize_filename(dirty):
67 '''Remove troublesome characters from filenames.'''
68 # http://www.dwheeler.com/essays/fixing-unix-linux-filenames.html
74 if (c >= '\x00' and c <= '\x1f') or c == '\x7f' or c == '/':
75 # skip control characters and /
79 # strip leading - or ~ and leading/trailing whitespace
80 stripped = fn.lstrip("-~ ").rstrip()
87 class FreshBase(object):
88 '''Base class for maintaining fresh/stale state to determine when to update.'''
92 self._last_update = time.time()
93 self._atime = time.time()
96 # Mark the value as stale
100 # Test if the entries dict is stale.
105 return (self._last_update + self._poll_time) < self._atime
110 self._last_update = time.time()
115 class File(FreshBase):
116 '''Base for file objects.'''
118 def __init__(self, parent_inode, _mtime=0):
119 super(File, self).__init__()
121 self.parent_inode = parent_inode
127 def readfrom(self, off, size):
134 class StreamReaderFile(File):
135 '''Wraps a StreamFileReader as a file.'''
137 def __init__(self, parent_inode, reader, _mtime):
138 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
142 return self.reader.size()
144 def readfrom(self, off, size):
145 return self.reader.readfrom(off, size)
151 class StringFile(File):
152 '''Wrap a simple string as a file'''
153 def __init__(self, parent_inode, contents, _mtime):
154 super(StringFile, self).__init__(parent_inode, _mtime)
155 self.contents = contents
158 return len(self.contents)
160 def readfrom(self, off, size):
161 return self.contents[off:(off+size)]
164 class ObjectFile(StringFile):
165 '''Wrap a dict as a serialized json object.'''
167 def __init__(self, parent_inode, obj):
168 super(ObjectFile, self).__init__(parent_inode, "", 0)
169 self.uuid = obj['uuid']
172 def update(self, obj):
173 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
174 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
177 class Directory(FreshBase):
178 '''Generic directory object, backed by a dict.
179 Consists of a set of entries with the key representing the filename
180 and the value referencing a File or Directory object.
183 def __init__(self, parent_inode):
184 super(Directory, self).__init__()
186 '''parent_inode is the integer inode number'''
188 if not isinstance(parent_inode, int):
189 raise Exception("parent_inode should be an int")
190 self.parent_inode = parent_inode
192 self._mtime = time.time()
194 # Overriden by subclasses to implement logic to update the entries dict
195 # when the directory is stale
199 # Only used when computing the size of the disk footprint of the directory
204 def checkupdate(self):
208 except apiclient.errors.HttpError as e:
211 def __getitem__(self, item):
213 return self._entries[item]
217 return self._entries.items()
221 return self._entries.iterkeys()
223 def __contains__(self, k):
225 return k in self._entries
227 def merge(self, items, fn, same, new_entry):
228 '''Helper method for updating the contents of the directory. Takes a list
229 describing the new contents of the directory, reuse entries that are
230 the same in both the old and new lists, create new entries, and delete
231 old entries missing from the new list.
233 items: iterable with new directory contents
235 fn: function to take an entry in 'items' and return the desired file or
236 directory name, or None if this entry should be skipped
238 same: function to compare an existing entry (a File or Directory
239 object) with an entry in the items list to determine whether to keep
242 new_entry: function to create a new directory entry (File or Directory
243 object) from an entry in the items list.
247 oldentries = self._entries
251 name = sanitize_filename(fn(i))
253 if name in oldentries and same(oldentries[name], i):
254 # move existing directory entry over
255 self._entries[name] = oldentries[name]
258 # create new directory entry
261 self._entries[name] = self.inodes.add_entry(ent)
264 # delete any other directory entries that were not in found in 'items'
266 llfuse.invalidate_entry(self.inode, str(i))
267 self.inodes.del_entry(oldentries[i])
271 self._mtime = time.time()
276 '''Delete all entries'''
277 oldentries = self._entries
280 if isinstance(n, Directory):
282 llfuse.invalidate_entry(self.inode, str(n))
283 self.inodes.del_entry(oldentries[n])
290 class CollectionDirectory(Directory):
291 '''Represents the root of a directory tree holding a collection.'''
293 def __init__(self, parent_inode, inodes, api, num_retries, collection):
294 super(CollectionDirectory, self).__init__(parent_inode)
297 self.num_retries = num_retries
298 self.collection_object_file = None
299 self.collection_object = None
300 if isinstance(collection, dict):
301 self.collection_locator = collection['uuid']
303 self.collection_locator = collection
306 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
308 def new_collection(self, new_collection_object):
309 self.collection_object = new_collection_object
311 if self.collection_object_file is not None:
312 self.collection_object_file.update(self.collection_object)
315 collection = arvados.CollectionReader(
316 self.collection_object["manifest_text"], self.api,
317 self.api.localkeep(), num_retries=self.num_retries)
318 for s in collection.all_streams():
320 for part in s.name().split('/'):
321 if part != '' and part != '.':
322 partname = sanitize_filename(part)
323 if partname not in cwd._entries:
324 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
325 cwd = cwd._entries[partname]
326 for k, v in s.files().items():
327 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
331 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
334 with llfuse.lock_released:
335 new_collection_object = self.api.collections().get(
336 uuid=self.collection_locator
337 ).execute(num_retries=self.num_retries)
338 if "portable_data_hash" not in new_collection_object:
339 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
340 # end with llfuse.lock_released, re-acquire lock
342 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
343 self.new_collection(new_collection_object)
347 except apiclient.errors.HttpError as e:
348 if e.resp.status == 404:
349 _logger.warn("arv-mount %s: not found", self.collection_locator)
351 _logger.error("arv-mount %s: error", self.collection_locator)
352 _logger.exception(detail)
353 except arvados.errors.ArgumentError as detail:
354 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
355 if self.collection_object is not None and "manifest_text" in self.collection_object:
356 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
357 except Exception as detail:
358 _logger.error("arv-mount %s: error", self.collection_locator)
359 if self.collection_object is not None and "manifest_text" in self.collection_object:
360 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
361 _logger.exception(detail)
364 def __getitem__(self, item):
366 if item == '.arvados#collection':
367 if self.collection_object_file is None:
368 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
369 self.inodes.add_entry(self.collection_object_file)
370 return self.collection_object_file
372 return super(CollectionDirectory, self).__getitem__(item)
374 def __contains__(self, k):
375 if k == '.arvados#collection':
378 return super(CollectionDirectory, self).__contains__(k)
382 return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
385 class MagicDirectory(Directory):
386 '''A special directory that logically contains the set of all extant keep
387 locators. When a file is referenced by lookup(), it is tested to see if it
388 is a valid keep locator to a manifest, and if so, loads the manifest
389 contents as a subdirectory of this directory with the locator as the
390 directory name. Since querying a list of all extant keep locators is
391 impractical, only collections that have already been accessed are visible
395 def __init__(self, parent_inode, inodes, api, num_retries):
396 super(MagicDirectory, self).__init__(parent_inode)
399 self.num_retries = num_retries
400 # Have to defer creating readme_file because at this point we don't
401 # yet have an inode assigned.
402 self.readme_file = None
404 def create_readme(self):
405 if self.readme_file is None:
406 text = '''This directory provides access to Arvados collections as subdirectories listed
407 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
408 the form '1234567890abcdefghijklmnopqrstuv+123').
410 Note that this directory will appear empty until you attempt to access a
411 specific collection subdirectory (such as trying to 'cd' into it), at which
412 point the collection will actually be looked up on the server and the directory
413 will appear if it exists.
415 self.readme_file = self.inodes.add_entry(StringFile(self.inode, text, time.time()))
416 self._entries["README"] = self.readme_file
418 def __contains__(self, k):
421 if k in self._entries:
424 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
428 e = self.inodes.add_entry(CollectionDirectory(
429 self.inode, self.inodes, self.api, self.num_retries, k))
435 except Exception as e:
436 _logger.debug('arv-mount exception keep %s', e)
441 return self._entries.items()
443 def __getitem__(self, item):
445 return self._entries[item]
447 raise KeyError("No collection with id " + item)
450 class RecursiveInvalidateDirectory(Directory):
451 def invalidate(self):
452 if self.inode == llfuse.ROOT_INODE:
453 llfuse.lock.acquire()
455 super(RecursiveInvalidateDirectory, self).invalidate()
456 for a in self._entries:
457 self._entries[a].invalidate()
458 except Exception as e:
461 if self.inode == llfuse.ROOT_INODE:
462 llfuse.lock.release()
465 class TagsDirectory(RecursiveInvalidateDirectory):
466 '''A special directory that contains as subdirectories all tags visible to the user.'''
468 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
469 super(TagsDirectory, self).__init__(parent_inode)
472 self.num_retries = num_retries
474 self._poll_time = poll_time
477 with llfuse.lock_released:
478 tags = self.api.links().list(
479 filters=[['link_class', '=', 'tag']],
480 select=['name'], distinct=True
481 ).execute(num_retries=self.num_retries)
483 self.merge(tags['items'],
484 lambda i: i['name'] if 'name' in i else i['uuid'],
485 lambda a, i: a.tag == i,
486 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
489 class TagDirectory(Directory):
490 '''A special directory that contains as subdirectories all collections visible
491 to the user that are tagged with a particular tag.
494 def __init__(self, parent_inode, inodes, api, num_retries, tag,
495 poll=False, poll_time=60):
496 super(TagDirectory, self).__init__(parent_inode)
499 self.num_retries = num_retries
502 self._poll_time = poll_time
505 with llfuse.lock_released:
506 taggedcollections = self.api.links().list(
507 filters=[['link_class', '=', 'tag'],
508 ['name', '=', self.tag],
509 ['head_uuid', 'is_a', 'arvados#collection']],
511 ).execute(num_retries=self.num_retries)
512 self.merge(taggedcollections['items'],
513 lambda i: i['head_uuid'],
514 lambda a, i: a.collection_locator == i['head_uuid'],
515 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
518 class ProjectDirectory(Directory):
519 '''A special directory that contains the contents of a project.'''
521 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
522 poll=False, poll_time=60):
523 super(ProjectDirectory, self).__init__(parent_inode)
526 self.num_retries = num_retries
527 self.project_object = project_object
528 self.project_object_file = None
529 self.uuid = project_object['uuid']
531 def createDirectory(self, i):
532 if collection_uuid_pattern.match(i['uuid']):
533 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
534 elif group_uuid_pattern.match(i['uuid']):
535 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
536 elif link_uuid_pattern.match(i['uuid']):
537 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
538 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
541 elif uuid_pattern.match(i['uuid']):
542 return ObjectFile(self.parent_inode, i)
547 if self.project_object_file == None:
548 self.project_object_file = ObjectFile(self.inode, self.project_object)
549 self.inodes.add_entry(self.project_object_file)
553 if i['name'] is None or len(i['name']) == 0:
555 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
556 # collection or subproject
558 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
561 elif 'kind' in i and i['kind'].startswith('arvados#'):
563 return "{}.{}".format(i['name'], i['kind'][8:])
568 if isinstance(a, CollectionDirectory):
569 return a.collection_locator == i['uuid']
570 elif isinstance(a, ProjectDirectory):
571 return a.uuid == i['uuid']
572 elif isinstance(a, ObjectFile):
573 return a.uuid == i['uuid'] and not a.stale()
576 with llfuse.lock_released:
577 if group_uuid_pattern.match(self.uuid):
578 self.project_object = self.api.groups().get(
579 uuid=self.uuid).execute(num_retries=self.num_retries)
580 elif user_uuid_pattern.match(self.uuid):
581 self.project_object = self.api.users().get(
582 uuid=self.uuid).execute(num_retries=self.num_retries)
584 contents = arvados.util.list_all(self.api.groups().contents,
585 self.num_retries, uuid=self.uuid)
586 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
587 contents += arvados.util.list_all(
588 self.api.links().list, self.num_retries,
589 filters=[['tail_uuid', '=', self.uuid],
590 ['link_class', '=', 'name']])
592 # end with llfuse.lock_released, re-acquire lock
597 self.createDirectory)
599 def __getitem__(self, item):
601 if item == '.arvados#project':
602 return self.project_object_file
604 return super(ProjectDirectory, self).__getitem__(item)
606 def __contains__(self, k):
607 if k == '.arvados#project':
610 return super(ProjectDirectory, self).__contains__(k)
613 class SharedDirectory(Directory):
614 '''A special directory that represents users or groups who have shared projects with me.'''
616 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
617 poll=False, poll_time=60):
618 super(SharedDirectory, self).__init__(parent_inode)
621 self.num_retries = num_retries
622 self.current_user = api.users().current().execute(num_retries=num_retries)
624 self._poll_time = poll_time
627 with llfuse.lock_released:
628 all_projects = arvados.util.list_all(
629 self.api.groups().list, self.num_retries,
630 filters=[['group_class','=','project']])
632 for ob in all_projects:
633 objects[ob['uuid']] = ob
637 for ob in all_projects:
638 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
640 root_owners[ob['owner_uuid']] = True
642 lusers = arvados.util.list_all(
643 self.api.users().list, self.num_retries,
644 filters=[['uuid','in', list(root_owners)]])
645 lgroups = arvados.util.list_all(
646 self.api.groups().list, self.num_retries,
647 filters=[['uuid','in', list(root_owners)]])
653 objects[l["uuid"]] = l
655 objects[l["uuid"]] = l
658 for r in root_owners:
662 contents[obr["name"]] = obr
663 if "first_name" in obr:
664 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
667 if r['owner_uuid'] not in objects:
668 contents[r['name']] = r
670 # end with llfuse.lock_released, re-acquire lock
673 self.merge(contents.items(),
675 lambda a, i: a.uuid == i[1]['uuid'],
676 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
677 except Exception as e:
681 class FileHandle(object):
682 '''Connects a numeric file handle to a File or Directory object that has
683 been opened by the client.'''
685 def __init__(self, fh, entry):
690 class Inodes(object):
691 '''Manage the set of inodes. This is the mapping from a numeric id
692 to a concrete File or Directory object'''
696 self._counter = llfuse.ROOT_INODE
698 def __getitem__(self, item):
699 return self._entries[item]
701 def __setitem__(self, key, item):
702 self._entries[key] = item
705 return self._entries.iterkeys()
708 return self._entries.items()
710 def __contains__(self, k):
711 return k in self._entries
713 def add_entry(self, entry):
714 entry.inode = self._counter
715 self._entries[entry.inode] = entry
719 def del_entry(self, entry):
720 llfuse.invalidate_inode(entry.inode)
721 del self._entries[entry.inode]
723 class Operations(llfuse.Operations):
724 '''This is the main interface with llfuse. The methods on this object are
725 called by llfuse threads to service FUSE events to query and read from
728 llfuse has its own global lock which is acquired before calling a request handler,
729 so request handlers do not run concurrently unless the lock is explicitly released
730 using "with llfuse.lock_released:"'''
732 def __init__(self, uid, gid, encoding="utf-8"):
733 super(Operations, self).__init__()
735 self.inodes = Inodes()
738 self.encoding = encoding
740 # dict of inode to filehandle
741 self._filehandles = {}
742 self._filehandles_counter = 1
744 # Other threads that need to wait until the fuse driver
745 # is fully initialized should wait() on this event object.
746 self.initlock = threading.Event()
749 # Allow threads that are waiting for the driver to be finished
750 # initializing to continue
753 def access(self, inode, mode, ctx):
756 def getattr(self, inode):
757 if inode not in self.inodes:
758 raise llfuse.FUSEError(errno.ENOENT)
760 e = self.inodes[inode]
762 entry = llfuse.EntryAttributes()
765 entry.entry_timeout = 300
766 entry.attr_timeout = 300
768 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
769 if isinstance(e, Directory):
770 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
772 entry.st_mode |= stat.S_IFREG
775 entry.st_uid = self.uid
776 entry.st_gid = self.gid
779 entry.st_size = e.size()
781 entry.st_blksize = 512
782 entry.st_blocks = (e.size()/512)+1
783 entry.st_atime = int(e.atime())
784 entry.st_mtime = int(e.mtime())
785 entry.st_ctime = int(e.mtime())
789 def lookup(self, parent_inode, name):
790 name = unicode(name, self.encoding)
791 _logger.debug("arv-mount lookup: parent_inode %i name %s",
798 if parent_inode in self.inodes:
799 p = self.inodes[parent_inode]
801 inode = p.parent_inode
802 elif isinstance(p, Directory) and name in p:
803 inode = p[name].inode
806 return self.getattr(inode)
808 raise llfuse.FUSEError(errno.ENOENT)
810 def open(self, inode, flags):
811 if inode in self.inodes:
812 p = self.inodes[inode]
814 raise llfuse.FUSEError(errno.ENOENT)
816 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
817 raise llfuse.FUSEError(errno.EROFS)
819 if isinstance(p, Directory):
820 raise llfuse.FUSEError(errno.EISDIR)
822 fh = self._filehandles_counter
823 self._filehandles_counter += 1
824 self._filehandles[fh] = FileHandle(fh, p)
827 def read(self, fh, off, size):
828 _logger.debug("arv-mount read %i %i %i", fh, off, size)
829 if fh in self._filehandles:
830 handle = self._filehandles[fh]
832 raise llfuse.FUSEError(errno.EBADF)
835 handle.entry._atime = time.time()
838 with llfuse.lock_released:
839 return handle.entry.readfrom(off, size)
840 except arvados.errors.NotFoundError as e:
841 _logger.warning("Block not found: " + str(e))
842 raise llfuse.FUSEError(errno.EIO)
843 except Exception as e:
845 raise llfuse.FUSEError(errno.EIO)
847 def release(self, fh):
848 if fh in self._filehandles:
849 del self._filehandles[fh]
851 def opendir(self, inode):
852 _logger.debug("arv-mount opendir: inode %i", inode)
854 if inode in self.inodes:
855 p = self.inodes[inode]
857 raise llfuse.FUSEError(errno.ENOENT)
859 if not isinstance(p, Directory):
860 raise llfuse.FUSEError(errno.ENOTDIR)
862 fh = self._filehandles_counter
863 self._filehandles_counter += 1
864 if p.parent_inode in self.inodes:
865 parent = self.inodes[p.parent_inode]
867 raise llfuse.FUSEError(errno.EIO)
870 p._atime = time.time()
872 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
875 def readdir(self, fh, off):
876 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
878 if fh in self._filehandles:
879 handle = self._filehandles[fh]
881 raise llfuse.FUSEError(errno.EBADF)
883 _logger.debug("arv-mount handle.entry %s", handle.entry)
886 while e < len(handle.entry):
887 if handle.entry[e][1].inode in self.inodes:
889 yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
890 except UnicodeEncodeError:
894 def releasedir(self, fh):
895 del self._filehandles[fh]
898 st = llfuse.StatvfsData()
899 st.f_bsize = 64 * 1024
912 # The llfuse documentation recommends only overloading functions that
913 # are actually implemented, as the default implementation will raise ENOSYS.
914 # However, there is a bug in the llfuse default implementation of create()
915 # "create() takes exactly 5 positional arguments (6 given)" which will crash
917 # The workaround is to implement it with the proper number of parameters,
918 # and then everything works out.
919 def create(self, p1, p2, p3, p4, p5):
920 raise llfuse.FUSEError(errno.EROFS)