2 # FUSE driver for Arvados Keep
8 from llfuse import FUSEError
22 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
24 _logger = logging.getLogger('arvados.arvados_fuse')
26 class SafeApi(object):
27 '''Threadsafe wrapper for API object. This stores and returns a different api
28 object per thread, because httplib2 which underlies apiclient is not
32 def __init__(self, config):
33 self.host = config.get('ARVADOS_API_HOST')
34 self.token = config.get('ARVADOS_API_TOKEN')
35 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
36 self.local = threading.local()
37 self.block_cache = arvados.KeepBlockCache()
40 if 'api' not in self.local.__dict__:
41 self.local.api = arvados.api('v1', False, self.host, self.token, self.insecure)
45 if 'keep' not in self.local.__dict__:
46 self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
47 return self.local.keep
49 def collections(self):
50 return self.localapi().collections()
53 return self.localapi().links()
56 return self.localapi().groups()
59 return self.localapi().users()
62 '''Parse Arvados timestamp to unix time.'''
64 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
65 except (TypeError, ValueError):
68 def sanitize_filename(dirty):
69 '''Remove troublesome characters from filenames.'''
70 # http://www.dwheeler.com/essays/fixing-unix-linux-filenames.html
76 if (c >= '\x00' and c <= '\x1f') or c == '\x7f' or c == '/':
77 # skip control characters and /
81 # strip leading - or ~ and leading/trailing whitespace
82 stripped = fn.lstrip("-~ ").rstrip()
89 class FreshBase(object):
90 '''Base class for maintaining fresh/stale state to determine when to update.'''
94 self._last_update = time.time()
95 self._atime = time.time()
98 # Mark the value as stale
102 # Test if the entries dict is stale.
107 return (self._last_update + self._poll_time) < self._atime
112 self._last_update = time.time()
117 class File(FreshBase):
118 '''Base for file objects.'''
120 def __init__(self, parent_inode, _mtime=0):
121 super(File, self).__init__()
123 self.parent_inode = parent_inode
129 def readfrom(self, off, size):
136 class StreamReaderFile(File):
137 '''Wraps a StreamFileReader as a file.'''
139 def __init__(self, parent_inode, reader, _mtime):
140 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
144 return self.reader.size()
146 def readfrom(self, off, size):
147 return self.reader.readfrom(off, size)
153 class StringFile(File):
154 '''Wrap a simple string as a file'''
155 def __init__(self, parent_inode, contents, _mtime):
156 super(StringFile, self).__init__(parent_inode, _mtime)
157 self.contents = contents
160 return len(self.contents)
162 def readfrom(self, off, size):
163 return self.contents[off:(off+size)]
166 class ObjectFile(StringFile):
167 '''Wrap a dict as a serialized json object.'''
169 def __init__(self, parent_inode, obj):
170 super(ObjectFile, self).__init__(parent_inode, "", 0)
171 self.uuid = obj['uuid']
174 def update(self, obj):
175 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
176 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
179 class Directory(FreshBase):
180 '''Generic directory object, backed by a dict.
181 Consists of a set of entries with the key representing the filename
182 and the value referencing a File or Directory object.
185 def __init__(self, parent_inode):
186 super(Directory, self).__init__()
188 '''parent_inode is the integer inode number'''
190 if not isinstance(parent_inode, int):
191 raise Exception("parent_inode should be an int")
192 self.parent_inode = parent_inode
194 self._mtime = time.time()
196 # Overriden by subclasses to implement logic to update the entries dict
197 # when the directory is stale
201 # Only used when computing the size of the disk footprint of the directory
206 def checkupdate(self):
210 except apiclient.errors.HttpError as e:
213 def __getitem__(self, item):
215 return self._entries[item]
219 return self._entries.items()
223 return self._entries.iterkeys()
225 def __contains__(self, k):
227 return k in self._entries
229 def merge(self, items, fn, same, new_entry):
230 '''Helper method for updating the contents of the directory. Takes a list
231 describing the new contents of the directory, reuse entries that are
232 the same in both the old and new lists, create new entries, and delete
233 old entries missing from the new list.
235 items: iterable with new directory contents
237 fn: function to take an entry in 'items' and return the desired file or
238 directory name, or None if this entry should be skipped
240 same: function to compare an existing entry (a File or Directory
241 object) with an entry in the items list to determine whether to keep
244 new_entry: function to create a new directory entry (File or Directory
245 object) from an entry in the items list.
249 oldentries = self._entries
253 name = sanitize_filename(fn(i))
255 if name in oldentries and same(oldentries[name], i):
256 # move existing directory entry over
257 self._entries[name] = oldentries[name]
260 # create new directory entry
263 self._entries[name] = self.inodes.add_entry(ent)
266 # delete any other directory entries that were not in found in 'items'
268 llfuse.invalidate_entry(self.inode, str(i))
269 self.inodes.del_entry(oldentries[i])
273 self._mtime = time.time()
278 '''Delete all entries'''
279 oldentries = self._entries
282 if isinstance(n, Directory):
284 llfuse.invalidate_entry(self.inode, str(n))
285 self.inodes.del_entry(oldentries[n])
292 class CollectionDirectory(Directory):
293 '''Represents the root of a directory tree holding a collection.'''
295 def __init__(self, parent_inode, inodes, api, collection):
296 super(CollectionDirectory, self).__init__(parent_inode)
299 self.collection_object_file = None
300 self.collection_object = None
301 if isinstance(collection, dict):
302 self.collection_locator = collection['uuid']
304 self.collection_locator = collection
307 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
309 def new_collection(self, new_collection_object):
310 self.collection_object = new_collection_object
312 if self.collection_object_file is not None:
313 self.collection_object_file.update(self.collection_object)
316 collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api, self.api.localkeep())
317 for s in collection.all_streams():
319 for part in s.name().split('/'):
320 if part != '' and part != '.':
321 partname = sanitize_filename(part)
322 if partname not in cwd._entries:
323 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
324 cwd = cwd._entries[partname]
325 for k, v in s.files().items():
326 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
330 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
333 with llfuse.lock_released:
334 new_collection_object = self.api.collections().get(uuid=self.collection_locator).execute()
335 if "portable_data_hash" not in new_collection_object:
336 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
337 # end with llfuse.lock_released, re-acquire lock
339 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
340 self.new_collection(new_collection_object)
344 except apiclient.errors.HttpError as e:
345 if e.resp.status == 404:
346 _logger.warn("arv-mount %s: not found", self.collection_locator)
348 _logger.error("arv-mount %s: error", self.collection_locator)
349 _logger.exception(detail)
350 except arvados.errors.ArgumentError as detail:
351 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
352 if self.collection_object is not None and "manifest_text" in self.collection_object:
353 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
354 except Exception as detail:
355 _logger.error("arv-mount %s: error", self.collection_locator)
356 if self.collection_object is not None and "manifest_text" in self.collection_object:
357 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
358 _logger.exception(detail)
361 def __getitem__(self, item):
363 if item == '.arvados#collection':
364 if self.collection_object_file is None:
365 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
366 self.inodes.add_entry(self.collection_object_file)
367 return self.collection_object_file
369 return super(CollectionDirectory, self).__getitem__(item)
371 def __contains__(self, k):
372 if k == '.arvados#collection':
375 return super(CollectionDirectory, self).__contains__(k)
379 return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
382 class MagicDirectory(Directory):
383 '''A special directory that logically contains the set of all extant keep
384 locators. When a file is referenced by lookup(), it is tested to see if it
385 is a valid keep locator to a manifest, and if so, loads the manifest
386 contents as a subdirectory of this directory with the locator as the
387 directory name. Since querying a list of all extant keep locators is
388 impractical, only collections that have already been accessed are visible
392 def __init__(self, parent_inode, inodes, api):
393 super(MagicDirectory, self).__init__(parent_inode)
396 # Have to defer creating readme_file because at this point we don't
397 # yet have an inode assigned.
398 self.readme_file = None
400 def create_readme(self):
401 if self.readme_file is None:
402 text = '''This directory provides access to Arvados collections as subdirectories listed
403 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
404 the form '1234567890abcdefghijklmnopqrstuv+123').
406 Note that this directory will appear empty until you attempt to access a
407 specific collection subdirectory (such as trying to 'cd' into it), at which
408 point the collection will actually be looked up on the server and the directory
409 will appear if it exists.
411 self.readme_file = self.inodes.add_entry(StringFile(self.inode, text, time.time()))
412 self._entries["README"] = self.readme_file
414 def __contains__(self, k):
417 if k in self._entries:
420 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
424 e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, self.api, k))
430 except Exception as e:
431 _logger.debug('arv-mount exception keep %s', e)
436 return self._entries.items()
438 def __getitem__(self, item):
440 return self._entries[item]
442 raise KeyError("No collection with id " + item)
445 class RecursiveInvalidateDirectory(Directory):
446 def invalidate(self):
447 if self.inode == llfuse.ROOT_INODE:
448 llfuse.lock.acquire()
450 super(RecursiveInvalidateDirectory, self).invalidate()
451 for a in self._entries:
452 self._entries[a].invalidate()
453 except Exception as e:
456 if self.inode == llfuse.ROOT_INODE:
457 llfuse.lock.release()
460 class TagsDirectory(RecursiveInvalidateDirectory):
461 '''A special directory that contains as subdirectories all tags visible to the user.'''
463 def __init__(self, parent_inode, inodes, api, poll_time=60):
464 super(TagsDirectory, self).__init__(parent_inode)
468 self._poll_time = poll_time
471 with llfuse.lock_released:
472 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
474 self.merge(tags['items'],
475 lambda i: i['name'] if 'name' in i else i['uuid'],
476 lambda a, i: a.tag == i,
477 lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
480 class TagDirectory(Directory):
481 '''A special directory that contains as subdirectories all collections visible
482 to the user that are tagged with a particular tag.
485 def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
486 super(TagDirectory, self).__init__(parent_inode)
491 self._poll_time = poll_time
494 with llfuse.lock_released:
495 taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
496 ['name', '=', self.tag],
497 ['head_uuid', 'is_a', 'arvados#collection']],
498 select=['head_uuid']).execute()
499 self.merge(taggedcollections['items'],
500 lambda i: i['head_uuid'],
501 lambda a, i: a.collection_locator == i['head_uuid'],
502 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid']))
505 class ProjectDirectory(Directory):
506 '''A special directory that contains the contents of a project.'''
508 def __init__(self, parent_inode, inodes, api, project_object, poll=False, poll_time=60):
509 super(ProjectDirectory, self).__init__(parent_inode)
512 self.project_object = project_object
513 self.project_object_file = None
514 self.uuid = project_object['uuid']
516 def createDirectory(self, i):
517 if collection_uuid_pattern.match(i['uuid']):
518 return CollectionDirectory(self.inode, self.inodes, self.api, i)
519 elif group_uuid_pattern.match(i['uuid']):
520 return ProjectDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
521 elif link_uuid_pattern.match(i['uuid']):
522 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
523 return CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid'])
526 elif uuid_pattern.match(i['uuid']):
527 return ObjectFile(self.parent_inode, i)
532 if self.project_object_file == None:
533 self.project_object_file = ObjectFile(self.inode, self.project_object)
534 self.inodes.add_entry(self.project_object_file)
538 if i['name'] is None or len(i['name']) == 0:
540 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
541 # collection or subproject
543 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
546 elif 'kind' in i and i['kind'].startswith('arvados#'):
548 return "{}.{}".format(i['name'], i['kind'][8:])
553 if isinstance(a, CollectionDirectory):
554 return a.collection_locator == i['uuid']
555 elif isinstance(a, ProjectDirectory):
556 return a.uuid == i['uuid']
557 elif isinstance(a, ObjectFile):
558 return a.uuid == i['uuid'] and not a.stale()
561 with llfuse.lock_released:
562 if group_uuid_pattern.match(self.uuid):
563 self.project_object = self.api.groups().get(uuid=self.uuid).execute()
564 elif user_uuid_pattern.match(self.uuid):
565 self.project_object = self.api.users().get(uuid=self.uuid).execute()
567 contents = arvados.util.list_all(self.api.groups().contents, uuid=self.uuid)
568 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
569 contents += arvados.util.list_all(self.api.links().list, filters=[['tail_uuid', '=', self.uuid], ['link_class', '=', 'name']])
571 # end with llfuse.lock_released, re-acquire lock
576 self.createDirectory)
578 def __getitem__(self, item):
580 if item == '.arvados#project':
581 return self.project_object_file
583 return super(ProjectDirectory, self).__getitem__(item)
585 def __contains__(self, k):
586 if k == '.arvados#project':
589 return super(ProjectDirectory, self).__contains__(k)
592 class SharedDirectory(Directory):
593 '''A special directory that represents users or groups who have shared projects with me.'''
595 def __init__(self, parent_inode, inodes, api, exclude, poll=False, poll_time=60):
596 super(SharedDirectory, self).__init__(parent_inode)
597 self.current_user = api.users().current().execute()
601 self._poll_time = poll_time
604 with llfuse.lock_released:
605 all_projects = arvados.util.list_all(self.api.groups().list, filters=[['group_class','=','project']])
607 for ob in all_projects:
608 objects[ob['uuid']] = ob
612 for ob in all_projects:
613 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
615 root_owners[ob['owner_uuid']] = True
617 lusers = arvados.util.list_all(self.api.users().list, filters=[['uuid','in', list(root_owners)]])
618 lgroups = arvados.util.list_all(self.api.groups().list, filters=[['uuid','in', list(root_owners)]])
624 objects[l["uuid"]] = l
626 objects[l["uuid"]] = l
629 for r in root_owners:
633 contents[obr["name"]] = obr
634 if "first_name" in obr:
635 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
638 if r['owner_uuid'] not in objects:
639 contents[r['name']] = r
641 # end with llfuse.lock_released, re-acquire lock
644 self.merge(contents.items(),
646 lambda a, i: a.uuid == i[1]['uuid'],
647 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, i[1], poll=self._poll, poll_time=self._poll_time))
648 except Exception as e:
652 class FileHandle(object):
653 '''Connects a numeric file handle to a File or Directory object that has
654 been opened by the client.'''
656 def __init__(self, fh, entry):
661 class Inodes(object):
662 '''Manage the set of inodes. This is the mapping from a numeric id
663 to a concrete File or Directory object'''
667 self._counter = llfuse.ROOT_INODE
669 def __getitem__(self, item):
670 return self._entries[item]
672 def __setitem__(self, key, item):
673 self._entries[key] = item
676 return self._entries.iterkeys()
679 return self._entries.items()
681 def __contains__(self, k):
682 return k in self._entries
684 def add_entry(self, entry):
685 entry.inode = self._counter
686 self._entries[entry.inode] = entry
690 def del_entry(self, entry):
691 llfuse.invalidate_inode(entry.inode)
692 del self._entries[entry.inode]
694 class Operations(llfuse.Operations):
695 '''This is the main interface with llfuse. The methods on this object are
696 called by llfuse threads to service FUSE events to query and read from
699 llfuse has its own global lock which is acquired before calling a request handler,
700 so request handlers do not run concurrently unless the lock is explicitly released
701 using "with llfuse.lock_released:"'''
703 def __init__(self, uid, gid):
704 super(Operations, self).__init__()
706 self.inodes = Inodes()
710 # dict of inode to filehandle
711 self._filehandles = {}
712 self._filehandles_counter = 1
714 # Other threads that need to wait until the fuse driver
715 # is fully initialized should wait() on this event object.
716 self.initlock = threading.Event()
719 # Allow threads that are waiting for the driver to be finished
720 # initializing to continue
723 def access(self, inode, mode, ctx):
726 def getattr(self, inode):
727 if inode not in self.inodes:
728 raise llfuse.FUSEError(errno.ENOENT)
730 e = self.inodes[inode]
732 entry = llfuse.EntryAttributes()
735 entry.entry_timeout = 300
736 entry.attr_timeout = 300
738 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
739 if isinstance(e, Directory):
740 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
742 entry.st_mode |= stat.S_IFREG
745 entry.st_uid = self.uid
746 entry.st_gid = self.gid
749 entry.st_size = e.size()
751 entry.st_blksize = 512
752 entry.st_blocks = (e.size()/512)+1
753 entry.st_atime = int(e.atime())
754 entry.st_mtime = int(e.mtime())
755 entry.st_ctime = int(e.mtime())
759 def lookup(self, parent_inode, name):
760 _logger.debug("arv-mount lookup: parent_inode %i name %s",
767 if parent_inode in self.inodes:
768 p = self.inodes[parent_inode]
770 inode = p.parent_inode
771 elif isinstance(p, Directory) and name in p:
772 inode = p[name].inode
775 return self.getattr(inode)
777 raise llfuse.FUSEError(errno.ENOENT)
779 def open(self, inode, flags):
780 if inode in self.inodes:
781 p = self.inodes[inode]
783 raise llfuse.FUSEError(errno.ENOENT)
785 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
786 raise llfuse.FUSEError(errno.EROFS)
788 if isinstance(p, Directory):
789 raise llfuse.FUSEError(errno.EISDIR)
791 fh = self._filehandles_counter
792 self._filehandles_counter += 1
793 self._filehandles[fh] = FileHandle(fh, p)
796 def read(self, fh, off, size):
797 _logger.debug("arv-mount read %i %i %i", fh, off, size)
798 if fh in self._filehandles:
799 handle = self._filehandles[fh]
801 raise llfuse.FUSEError(errno.EBADF)
804 handle.entry._atime = time.time()
807 with llfuse.lock_released:
808 return handle.entry.readfrom(off, size)
809 except arvados.errors.NotFoundError as e:
810 _logger.warning("Block not found: " + str(e))
811 raise llfuse.FUSEError(errno.EIO)
812 except Exception as e:
814 raise llfuse.FUSEError(errno.EIO)
816 def release(self, fh):
817 if fh in self._filehandles:
818 del self._filehandles[fh]
820 def opendir(self, inode):
821 _logger.debug("arv-mount opendir: inode %i", inode)
823 if inode in self.inodes:
824 p = self.inodes[inode]
826 raise llfuse.FUSEError(errno.ENOENT)
828 if not isinstance(p, Directory):
829 raise llfuse.FUSEError(errno.ENOTDIR)
831 fh = self._filehandles_counter
832 self._filehandles_counter += 1
833 if p.parent_inode in self.inodes:
834 parent = self.inodes[p.parent_inode]
836 raise llfuse.FUSEError(errno.EIO)
839 p._atime = time.time()
841 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
844 def readdir(self, fh, off):
845 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
847 if fh in self._filehandles:
848 handle = self._filehandles[fh]
850 raise llfuse.FUSEError(errno.EBADF)
852 _logger.debug("arv-mount handle.entry %s", handle.entry)
855 while e < len(handle.entry):
856 if handle.entry[e][1].inode in self.inodes:
857 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
860 def releasedir(self, fh):
861 del self._filehandles[fh]
864 st = llfuse.StatvfsData()
865 st.f_bsize = 64 * 1024
878 # The llfuse documentation recommends only overloading functions that
879 # are actually implemented, as the default implementation will raise ENOSYS.
880 # However, there is a bug in the llfuse default implementation of create()
881 # "create() takes exactly 5 positional arguments (6 given)" which will crash
883 # The workaround is to implement it with the proper number of parameters,
884 # and then everything works out.
885 def create(self, p1, p2, p3, p4, p5):
886 raise llfuse.FUSEError(errno.EROFS)