2 # FUSE driver for Arvados Keep
8 from llfuse import FUSEError
23 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
25 _logger = logging.getLogger('arvados.arvados_fuse')
27 class SafeApi(object):
28 '''Threadsafe wrapper for API object. This stores and returns a different api
29 object per thread, because httplib2 which underlies apiclient is not
33 def __init__(self, config):
34 self.host = config.get('ARVADOS_API_HOST')
35 self.api_token = config.get('ARVADOS_API_TOKEN')
36 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
37 self.local = threading.local()
38 self.block_cache = arvados.KeepBlockCache()
41 if 'api' not in self.local.__dict__:
42 self.local.api = arvados.api('v1', False, self.host,
43 self.api_token, self.insecure)
47 if 'keep' not in self.local.__dict__:
48 self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
49 return self.local.keep
51 def __getattr__(self, name):
52 # Proxy nonexistent attributes to the local API client.
54 return getattr(self.localapi(), name)
55 except AttributeError:
56 return super(SafeApi, self).__getattr__(name)
60 '''Parse Arvados timestamp to unix time.'''
62 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
63 except (TypeError, ValueError):
66 def sanitize_filename(dirty):
67 '''Remove troublesome characters from filenames.'''
68 # http://www.dwheeler.com/essays/fixing-unix-linux-filenames.html
74 if (c >= '\x00' and c <= '\x1f') or c == '\x7f' or c == '/':
75 # skip control characters and /
79 # strip leading - or ~ and leading/trailing whitespace
80 stripped = fn.lstrip("-~ ").rstrip()
87 class FreshBase(object):
88 '''Base class for maintaining fresh/stale state to determine when to update.'''
92 self._last_update = time.time()
93 self._atime = time.time()
96 # Mark the value as stale
100 # Test if the entries dict is stale.
105 return (self._last_update + self._poll_time) < self._atime
110 self._last_update = time.time()
115 class File(FreshBase):
116 '''Base for file objects.'''
118 def __init__(self, parent_inode, _mtime=0):
119 super(File, self).__init__()
121 self.parent_inode = parent_inode
127 def readfrom(self, off, size):
134 class StreamReaderFile(File):
135 '''Wraps a StreamFileReader as a file.'''
137 def __init__(self, parent_inode, reader, _mtime):
138 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
142 return self.reader.size()
144 def readfrom(self, off, size):
145 return self.reader.readfrom(off, size)
151 class StringFile(File):
152 '''Wrap a simple string as a file'''
153 def __init__(self, parent_inode, contents, _mtime):
154 super(StringFile, self).__init__(parent_inode, _mtime)
155 self.contents = contents
158 return len(self.contents)
160 def readfrom(self, off, size):
161 return self.contents[off:(off+size)]
164 class ObjectFile(StringFile):
165 '''Wrap a dict as a serialized json object.'''
167 def __init__(self, parent_inode, obj):
168 super(ObjectFile, self).__init__(parent_inode, "", 0)
169 self.uuid = obj['uuid']
172 def update(self, obj):
173 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
174 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
177 class Directory(FreshBase):
178 '''Generic directory object, backed by a dict.
179 Consists of a set of entries with the key representing the filename
180 and the value referencing a File or Directory object.
183 def __init__(self, parent_inode):
184 super(Directory, self).__init__()
186 '''parent_inode is the integer inode number'''
188 if not isinstance(parent_inode, int):
189 raise Exception("parent_inode should be an int")
190 self.parent_inode = parent_inode
192 self._mtime = time.time()
194 # Overriden by subclasses to implement logic to update the entries dict
195 # when the directory is stale
199 # Only used when computing the size of the disk footprint of the directory
204 def checkupdate(self):
208 except apiclient.errors.HttpError as e:
211 def __getitem__(self, item):
213 return self._entries[item]
217 return self._entries.items()
221 return self._entries.iterkeys()
223 def __contains__(self, k):
225 return k in self._entries
227 def merge(self, items, fn, same, new_entry):
228 '''Helper method for updating the contents of the directory. Takes a list
229 describing the new contents of the directory, reuse entries that are
230 the same in both the old and new lists, create new entries, and delete
231 old entries missing from the new list.
233 items: iterable with new directory contents
235 fn: function to take an entry in 'items' and return the desired file or
236 directory name, or None if this entry should be skipped
238 same: function to compare an existing entry (a File or Directory
239 object) with an entry in the items list to determine whether to keep
242 new_entry: function to create a new directory entry (File or Directory
243 object) from an entry in the items list.
247 oldentries = self._entries
251 name = sanitize_filename(fn(i))
253 if name in oldentries and same(oldentries[name], i):
254 # move existing directory entry over
255 self._entries[name] = oldentries[name]
258 # create new directory entry
261 self._entries[name] = self.inodes.add_entry(ent)
264 # delete any other directory entries that were not in found in 'items'
266 llfuse.invalidate_entry(self.inode, str(i))
267 self.inodes.del_entry(oldentries[i])
271 self._mtime = time.time()
276 '''Delete all entries'''
277 oldentries = self._entries
280 if isinstance(n, Directory):
282 llfuse.invalidate_entry(self.inode, str(n))
283 self.inodes.del_entry(oldentries[n])
290 class CollectionDirectory(Directory):
291 '''Represents the root of a directory tree holding a collection.'''
293 def __init__(self, parent_inode, inodes, api, num_retries, collection):
294 super(CollectionDirectory, self).__init__(parent_inode)
297 self.num_retries = num_retries
298 self.collection_object_file = None
299 self.collection_object = None
300 if isinstance(collection, dict):
301 self.collection_locator = collection['uuid']
303 self.collection_locator = collection
306 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
308 def new_collection(self, new_collection_object):
309 self.collection_object = new_collection_object
311 if self.collection_object_file is not None:
312 self.collection_object_file.update(self.collection_object)
315 collection = arvados.CollectionReader(
316 self.collection_object["manifest_text"], self.api,
317 self.api.localkeep(), num_retries=self.num_retries)
318 collection.normalize()
319 for s in collection.all_streams():
321 for part in s.name().split('/'):
322 if part != '' and part != '.':
323 partname = sanitize_filename(part)
324 if partname not in cwd._entries:
325 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
326 cwd = cwd._entries[partname]
327 for k, v in s.files().items():
328 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
332 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
335 with llfuse.lock_released:
336 new_collection_object = self.api.collections().get(
337 uuid=self.collection_locator
338 ).execute(num_retries=self.num_retries)
339 if "portable_data_hash" not in new_collection_object:
340 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
341 # end with llfuse.lock_released, re-acquire lock
343 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
344 self.new_collection(new_collection_object)
348 except apiclient.errors.HttpError as e:
349 if e.resp.status == 404:
350 _logger.warn("arv-mount %s: not found", self.collection_locator)
352 _logger.error("arv-mount %s: error", self.collection_locator)
353 _logger.exception(detail)
354 except arvados.errors.ArgumentError as detail:
355 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
356 if self.collection_object is not None and "manifest_text" in self.collection_object:
357 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
358 except Exception as detail:
359 _logger.error("arv-mount %s: error", self.collection_locator)
360 if self.collection_object is not None and "manifest_text" in self.collection_object:
361 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
362 _logger.exception(detail)
365 def __getitem__(self, item):
367 if item == '.arvados#collection':
368 if self.collection_object_file is None:
369 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
370 self.inodes.add_entry(self.collection_object_file)
371 return self.collection_object_file
373 return super(CollectionDirectory, self).__getitem__(item)
375 def __contains__(self, k):
376 if k == '.arvados#collection':
379 return super(CollectionDirectory, self).__contains__(k)
383 return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
386 class MagicDirectory(Directory):
387 '''A special directory that logically contains the set of all extant keep
388 locators. When a file is referenced by lookup(), it is tested to see if it
389 is a valid keep locator to a manifest, and if so, loads the manifest
390 contents as a subdirectory of this directory with the locator as the
391 directory name. Since querying a list of all extant keep locators is
392 impractical, only collections that have already been accessed are visible
396 def __init__(self, parent_inode, inodes, api, num_retries):
397 super(MagicDirectory, self).__init__(parent_inode)
400 self.num_retries = num_retries
401 # Have to defer creating readme_file because at this point we don't
402 # yet have an inode assigned.
403 self.readme_file = None
405 def create_readme(self):
406 if self.readme_file is None:
407 text = '''This directory provides access to Arvados collections as subdirectories listed
408 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
409 the form '1234567890abcdefghijklmnopqrstuv+123').
411 Note that this directory will appear empty until you attempt to access a
412 specific collection subdirectory (such as trying to 'cd' into it), at which
413 point the collection will actually be looked up on the server and the directory
414 will appear if it exists.
416 self.readme_file = self.inodes.add_entry(StringFile(self.inode, text, time.time()))
417 self._entries["README"] = self.readme_file
419 def __contains__(self, k):
422 if k in self._entries:
425 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
429 e = self.inodes.add_entry(CollectionDirectory(
430 self.inode, self.inodes, self.api, self.num_retries, k))
436 except Exception as e:
437 _logger.debug('arv-mount exception keep %s', e)
442 return self._entries.items()
444 def __getitem__(self, item):
446 return self._entries[item]
448 raise KeyError("No collection with id " + item)
451 class RecursiveInvalidateDirectory(Directory):
452 def invalidate(self):
453 if self.inode == llfuse.ROOT_INODE:
454 llfuse.lock.acquire()
456 super(RecursiveInvalidateDirectory, self).invalidate()
457 for a in self._entries:
458 self._entries[a].invalidate()
459 except Exception as e:
462 if self.inode == llfuse.ROOT_INODE:
463 llfuse.lock.release()
466 class TagsDirectory(RecursiveInvalidateDirectory):
467 '''A special directory that contains as subdirectories all tags visible to the user.'''
469 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
470 super(TagsDirectory, self).__init__(parent_inode)
473 self.num_retries = num_retries
475 self._poll_time = poll_time
478 with llfuse.lock_released:
479 tags = self.api.links().list(
480 filters=[['link_class', '=', 'tag']],
481 select=['name'], distinct=True
482 ).execute(num_retries=self.num_retries)
484 self.merge(tags['items'],
485 lambda i: i['name'] if 'name' in i else i['uuid'],
486 lambda a, i: a.tag == i,
487 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
490 class TagDirectory(Directory):
491 '''A special directory that contains as subdirectories all collections visible
492 to the user that are tagged with a particular tag.
495 def __init__(self, parent_inode, inodes, api, num_retries, tag,
496 poll=False, poll_time=60):
497 super(TagDirectory, self).__init__(parent_inode)
500 self.num_retries = num_retries
503 self._poll_time = poll_time
506 with llfuse.lock_released:
507 taggedcollections = self.api.links().list(
508 filters=[['link_class', '=', 'tag'],
509 ['name', '=', self.tag],
510 ['head_uuid', 'is_a', 'arvados#collection']],
512 ).execute(num_retries=self.num_retries)
513 self.merge(taggedcollections['items'],
514 lambda i: i['head_uuid'],
515 lambda a, i: a.collection_locator == i['head_uuid'],
516 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
519 class ProjectDirectory(Directory):
520 '''A special directory that contains the contents of a project.'''
522 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
523 poll=False, poll_time=60):
524 super(ProjectDirectory, self).__init__(parent_inode)
527 self.num_retries = num_retries
528 self.project_object = project_object
529 self.project_object_file = None
530 self.uuid = project_object['uuid']
532 def createDirectory(self, i):
533 if collection_uuid_pattern.match(i['uuid']):
534 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
535 elif group_uuid_pattern.match(i['uuid']):
536 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
537 elif link_uuid_pattern.match(i['uuid']):
538 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
539 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
542 elif uuid_pattern.match(i['uuid']):
543 return ObjectFile(self.parent_inode, i)
548 if self.project_object_file == None:
549 self.project_object_file = ObjectFile(self.inode, self.project_object)
550 self.inodes.add_entry(self.project_object_file)
554 if i['name'] is None or len(i['name']) == 0:
556 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
557 # collection or subproject
559 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
562 elif 'kind' in i and i['kind'].startswith('arvados#'):
564 return "{}.{}".format(i['name'], i['kind'][8:])
569 if isinstance(a, CollectionDirectory):
570 return a.collection_locator == i['uuid']
571 elif isinstance(a, ProjectDirectory):
572 return a.uuid == i['uuid']
573 elif isinstance(a, ObjectFile):
574 return a.uuid == i['uuid'] and not a.stale()
577 with llfuse.lock_released:
578 if group_uuid_pattern.match(self.uuid):
579 self.project_object = self.api.groups().get(
580 uuid=self.uuid).execute(num_retries=self.num_retries)
581 elif user_uuid_pattern.match(self.uuid):
582 self.project_object = self.api.users().get(
583 uuid=self.uuid).execute(num_retries=self.num_retries)
585 contents = arvados.util.list_all(self.api.groups().contents,
586 self.num_retries, uuid=self.uuid)
587 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
588 contents += arvados.util.list_all(
589 self.api.links().list, self.num_retries,
590 filters=[['tail_uuid', '=', self.uuid],
591 ['link_class', '=', 'name']])
593 # end with llfuse.lock_released, re-acquire lock
598 self.createDirectory)
600 def __getitem__(self, item):
602 if item == '.arvados#project':
603 return self.project_object_file
605 return super(ProjectDirectory, self).__getitem__(item)
607 def __contains__(self, k):
608 if k == '.arvados#project':
611 return super(ProjectDirectory, self).__contains__(k)
614 class SharedDirectory(Directory):
615 '''A special directory that represents users or groups who have shared projects with me.'''
617 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
618 poll=False, poll_time=60):
619 super(SharedDirectory, self).__init__(parent_inode)
622 self.num_retries = num_retries
623 self.current_user = api.users().current().execute(num_retries=num_retries)
625 self._poll_time = poll_time
628 with llfuse.lock_released:
629 all_projects = arvados.util.list_all(
630 self.api.groups().list, self.num_retries,
631 filters=[['group_class','=','project']])
633 for ob in all_projects:
634 objects[ob['uuid']] = ob
638 for ob in all_projects:
639 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
641 root_owners[ob['owner_uuid']] = True
643 lusers = arvados.util.list_all(
644 self.api.users().list, self.num_retries,
645 filters=[['uuid','in', list(root_owners)]])
646 lgroups = arvados.util.list_all(
647 self.api.groups().list, self.num_retries,
648 filters=[['uuid','in', list(root_owners)]])
654 objects[l["uuid"]] = l
656 objects[l["uuid"]] = l
659 for r in root_owners:
663 contents[obr["name"]] = obr
664 if "first_name" in obr:
665 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
668 if r['owner_uuid'] not in objects:
669 contents[r['name']] = r
671 # end with llfuse.lock_released, re-acquire lock
674 self.merge(contents.items(),
676 lambda a, i: a.uuid == i[1]['uuid'],
677 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
678 except Exception as e:
682 class FileHandle(object):
683 '''Connects a numeric file handle to a File or Directory object that has
684 been opened by the client.'''
686 def __init__(self, fh, entry):
691 class Inodes(object):
692 '''Manage the set of inodes. This is the mapping from a numeric id
693 to a concrete File or Directory object'''
697 self._counter = llfuse.ROOT_INODE
699 def __getitem__(self, item):
700 return self._entries[item]
702 def __setitem__(self, key, item):
703 self._entries[key] = item
706 return self._entries.iterkeys()
709 return self._entries.items()
711 def __contains__(self, k):
712 return k in self._entries
714 def add_entry(self, entry):
715 entry.inode = self._counter
716 self._entries[entry.inode] = entry
720 def del_entry(self, entry):
721 llfuse.invalidate_inode(entry.inode)
722 del self._entries[entry.inode]
724 class Operations(llfuse.Operations):
725 '''This is the main interface with llfuse. The methods on this object are
726 called by llfuse threads to service FUSE events to query and read from
729 llfuse has its own global lock which is acquired before calling a request handler,
730 so request handlers do not run concurrently unless the lock is explicitly released
731 using "with llfuse.lock_released:"'''
733 def __init__(self, uid, gid, encoding="utf-8"):
734 super(Operations, self).__init__()
736 self.inodes = Inodes()
739 self.encoding = encoding
741 # dict of inode to filehandle
742 self._filehandles = {}
743 self._filehandles_counter = 1
745 # Other threads that need to wait until the fuse driver
746 # is fully initialized should wait() on this event object.
747 self.initlock = threading.Event()
750 # Allow threads that are waiting for the driver to be finished
751 # initializing to continue
754 def access(self, inode, mode, ctx):
757 def getattr(self, inode):
758 if inode not in self.inodes:
759 raise llfuse.FUSEError(errno.ENOENT)
761 e = self.inodes[inode]
763 entry = llfuse.EntryAttributes()
766 entry.entry_timeout = 300
767 entry.attr_timeout = 300
769 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
770 if isinstance(e, Directory):
771 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
773 entry.st_mode |= stat.S_IFREG
776 entry.st_uid = self.uid
777 entry.st_gid = self.gid
780 entry.st_size = e.size()
782 entry.st_blksize = 512
783 entry.st_blocks = (e.size()/512)+1
784 entry.st_atime = int(e.atime())
785 entry.st_mtime = int(e.mtime())
786 entry.st_ctime = int(e.mtime())
790 def lookup(self, parent_inode, name):
791 name = unicode(name, self.encoding)
792 _logger.debug("arv-mount lookup: parent_inode %i name %s",
799 if parent_inode in self.inodes:
800 p = self.inodes[parent_inode]
802 inode = p.parent_inode
803 elif isinstance(p, Directory) and name in p:
804 inode = p[name].inode
807 return self.getattr(inode)
809 raise llfuse.FUSEError(errno.ENOENT)
811 def open(self, inode, flags):
812 if inode in self.inodes:
813 p = self.inodes[inode]
815 raise llfuse.FUSEError(errno.ENOENT)
817 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
818 raise llfuse.FUSEError(errno.EROFS)
820 if isinstance(p, Directory):
821 raise llfuse.FUSEError(errno.EISDIR)
823 fh = self._filehandles_counter
824 self._filehandles_counter += 1
825 self._filehandles[fh] = FileHandle(fh, p)
828 def read(self, fh, off, size):
829 _logger.debug("arv-mount read %i %i %i", fh, off, size)
830 if fh in self._filehandles:
831 handle = self._filehandles[fh]
833 raise llfuse.FUSEError(errno.EBADF)
836 handle.entry._atime = time.time()
839 with llfuse.lock_released:
840 return handle.entry.readfrom(off, size)
841 except arvados.errors.NotFoundError as e:
842 _logger.warning("Block not found: " + str(e))
843 raise llfuse.FUSEError(errno.EIO)
844 except Exception as e:
846 raise llfuse.FUSEError(errno.EIO)
848 def release(self, fh):
849 if fh in self._filehandles:
850 del self._filehandles[fh]
852 def opendir(self, inode):
853 _logger.debug("arv-mount opendir: inode %i", inode)
855 if inode in self.inodes:
856 p = self.inodes[inode]
858 raise llfuse.FUSEError(errno.ENOENT)
860 if not isinstance(p, Directory):
861 raise llfuse.FUSEError(errno.ENOTDIR)
863 fh = self._filehandles_counter
864 self._filehandles_counter += 1
865 if p.parent_inode in self.inodes:
866 parent = self.inodes[p.parent_inode]
868 raise llfuse.FUSEError(errno.EIO)
871 p._atime = time.time()
873 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
876 def readdir(self, fh, off):
877 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
879 if fh in self._filehandles:
880 handle = self._filehandles[fh]
882 raise llfuse.FUSEError(errno.EBADF)
884 _logger.debug("arv-mount handle.entry %s", handle.entry)
887 while e < len(handle.entry):
888 if handle.entry[e][1].inode in self.inodes:
890 yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
891 except UnicodeEncodeError:
895 def releasedir(self, fh):
896 del self._filehandles[fh]
899 st = llfuse.StatvfsData()
900 st.f_bsize = 64 * 1024
913 # The llfuse documentation recommends only overloading functions that
914 # are actually implemented, as the default implementation will raise ENOSYS.
915 # However, there is a bug in the llfuse default implementation of create()
916 # "create() takes exactly 5 positional arguments (6 given)" which will crash
918 # The workaround is to implement it with the proper number of parameters,
919 # and then everything works out.
920 def create(self, p1, p2, p3, p4, p5):
921 raise llfuse.FUSEError(errno.EROFS)