2 # FUSE driver for Arvados Keep
8 from llfuse import FUSEError
22 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
24 _logger = logging.getLogger('arvados.arvados_fuse')
26 class SafeApi(object):
27 '''Threadsafe wrapper for API object. This stores and returns a different api
28 object per thread, because httplib2 which underlies apiclient is not
32 def __init__(self, config):
33 self.host = config.get('ARVADOS_API_HOST')
34 self.token = config.get('ARVADOS_API_TOKEN')
35 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
36 self.local = threading.local()
39 if 'api' not in self.local.__dict__:
40 self.local.api = arvados.api('v1', False, self.host, self.token, self.insecure)
43 def collections(self):
44 return self.localapi().collections()
47 return self.localapi().links()
50 return self.localapi().groups()
53 return self.localapi().users()
56 '''Parse Arvados timestamp to unix time.'''
58 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
59 except (TypeError, ValueError):
62 def sanitize_filename(dirty):
63 '''Remove troublesome characters from filenames.'''
64 # http://www.dwheeler.com/essays/fixing-unix-linux-filenames.html
70 if (c >= '\x00' and c <= '\x1f') or c == '\x7f' or c == '/':
71 # skip control characters and /
75 # strip leading - or ~ and leading/trailing whitespace
76 stripped = fn.lstrip("-~ ").rstrip()
83 class FreshBase(object):
84 '''Base class for maintaining fresh/stale state to determine when to update.'''
88 self._last_update = time.time()
89 self._atime = time.time()
92 # Mark the value as stale
96 # Test if the entries dict is stale.
101 return (self._last_update + self._poll_time) < self._atime
106 self._last_update = time.time()
111 class File(FreshBase):
112 '''Base for file objects.'''
114 def __init__(self, parent_inode, _mtime=0):
115 super(File, self).__init__()
117 self.parent_inode = parent_inode
123 def readfrom(self, off, size):
130 class StreamReaderFile(File):
131 '''Wraps a StreamFileReader as a file.'''
133 def __init__(self, parent_inode, reader, _mtime):
134 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
138 return self.reader.size()
140 def readfrom(self, off, size):
141 return self.reader.readfrom(off, size)
147 class StringFile(File):
148 '''Wrap a simple string as a file'''
149 def __init__(self, parent_inode, contents, _mtime):
150 super(StringFile, self).__init__(parent_inode, _mtime)
151 self.contents = contents
154 return len(self.contents)
156 def readfrom(self, off, size):
157 return self.contents[off:(off+size)]
160 class ObjectFile(StringFile):
161 '''Wrap a dict as a serialized json object.'''
163 def __init__(self, parent_inode, obj):
164 super(ObjectFile, self).__init__(parent_inode, "", 0)
165 self.uuid = obj['uuid']
168 def update(self, obj):
169 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
170 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
173 class Directory(FreshBase):
174 '''Generic directory object, backed by a dict.
175 Consists of a set of entries with the key representing the filename
176 and the value referencing a File or Directory object.
179 def __init__(self, parent_inode):
180 super(Directory, self).__init__()
182 '''parent_inode is the integer inode number'''
184 if not isinstance(parent_inode, int):
185 raise Exception("parent_inode should be an int")
186 self.parent_inode = parent_inode
188 self._mtime = time.time()
190 # Overriden by subclasses to implement logic to update the entries dict
191 # when the directory is stale
195 # Only used when computing the size of the disk footprint of the directory
200 def checkupdate(self):
204 except apiclient.errors.HttpError as e:
207 def __getitem__(self, item):
209 return self._entries[item]
213 return self._entries.items()
217 return self._entries.iterkeys()
219 def __contains__(self, k):
221 return k in self._entries
223 def merge(self, items, fn, same, new_entry):
224 '''Helper method for updating the contents of the directory. Takes a list
225 describing the new contents of the directory, reuse entries that are
226 the same in both the old and new lists, create new entries, and delete
227 old entries missing from the new list.
229 items: iterable with new directory contents
231 fn: function to take an entry in 'items' and return the desired file or
232 directory name, or None if this entry should be skipped
234 same: function to compare an existing entry (a File or Directory
235 object) with an entry in the items list to determine whether to keep
238 new_entry: function to create a new directory entry (File or Directory
239 object) from an entry in the items list.
243 oldentries = self._entries
247 name = sanitize_filename(fn(i))
249 if name in oldentries and same(oldentries[name], i):
250 # move existing directory entry over
251 self._entries[name] = oldentries[name]
254 # create new directory entry
257 self._entries[name] = self.inodes.add_entry(ent)
260 # delete any other directory entries that were not in found in 'items'
262 llfuse.invalidate_entry(self.inode, str(i))
263 self.inodes.del_entry(oldentries[i])
267 self._mtime = time.time()
272 '''Delete all entries'''
273 oldentries = self._entries
276 if isinstance(n, Directory):
278 llfuse.invalidate_entry(self.inode, str(n))
279 self.inodes.del_entry(oldentries[n])
286 class CollectionDirectory(Directory):
287 '''Represents the root of a directory tree holding a collection.'''
289 def __init__(self, parent_inode, inodes, api, collection):
290 super(CollectionDirectory, self).__init__(parent_inode)
293 self.collection_object_file = None
294 self.collection_object = None
295 if isinstance(collection, dict):
296 self.collection_locator = collection['uuid']
298 self.collection_locator = collection
301 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
303 def new_collection(self, new_collection_object):
304 self.collection_object = new_collection_object
306 if self.collection_object_file is not None:
307 self.collection_object_file.update(self.collection_object)
310 collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api)
311 for s in collection.all_streams():
313 for part in s.name().split('/'):
314 if part != '' and part != '.':
315 partname = sanitize_filename(part)
316 if partname not in cwd._entries:
317 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
318 cwd = cwd._entries[partname]
319 for k, v in s.files().items():
320 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
324 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
327 with llfuse.lock_released:
328 new_collection_object = self.api.collections().get(uuid=self.collection_locator).execute()
329 if "portable_data_hash" not in new_collection_object:
330 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
331 # end with llfuse.lock_released, re-acquire lock
333 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
334 self.new_collection(new_collection_object)
338 except apiclient.errors.HttpError as e:
339 if e.resp.status == 404:
340 _logger.warn("arv-mount %s: not found", self.collection_locator)
342 _logger.error("arv-mount %s: error", self.collection_locator)
343 _logger.exception(detail)
344 except Exception as detail:
345 _logger.error("arv-mount %s: error", self.collection_locator)
346 if self.collection_object is not None and "manifest_text" in self.collection_object:
347 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
348 _logger.exception(detail)
351 def __getitem__(self, item):
353 if item == '.arvados#collection':
354 if self.collection_object_file is None:
355 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
356 self.inodes.add_entry(self.collection_object_file)
357 return self.collection_object_file
359 return super(CollectionDirectory, self).__getitem__(item)
361 def __contains__(self, k):
362 if k == '.arvados#collection':
365 return super(CollectionDirectory, self).__contains__(k)
369 return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
372 class MagicDirectory(Directory):
373 '''A special directory that logically contains the set of all extant keep
374 locators. When a file is referenced by lookup(), it is tested to see if it
375 is a valid keep locator to a manifest, and if so, loads the manifest
376 contents as a subdirectory of this directory with the locator as the
377 directory name. Since querying a list of all extant keep locators is
378 impractical, only collections that have already been accessed are visible
382 def __init__(self, parent_inode, inodes, api):
383 super(MagicDirectory, self).__init__(parent_inode)
386 # Have to defer creating readme_file because at this point we don't
387 # yet have an inode assigned.
388 self.readme_file = None
390 def create_readme(self):
391 if self.readme_file is None:
392 text = '''This directory provides access to Arvados collections as subdirectories listed
393 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
394 the form '1234567890abcdefghijklmnopqrstuv+123').
396 Note that this directory will appear empty until you attempt to access a
397 specific collection subdirectory (such as trying to 'cd' into it), at which
398 point the collection will actually be looked up on the server and the directory
399 will appear if it exists.
401 self.readme_file = self.inodes.add_entry(StringFile(self.inode, text, time.time()))
402 self._entries["README"] = self.readme_file
404 def __contains__(self, k):
407 if k in self._entries:
410 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
414 e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, self.api, k))
420 except Exception as e:
421 _logger.debug('arv-mount exception keep %s', e)
426 return self._entries.items()
428 def __getitem__(self, item):
430 return self._entries[item]
432 raise KeyError("No collection with id " + item)
435 class RecursiveInvalidateDirectory(Directory):
436 def invalidate(self):
437 if self.inode == llfuse.ROOT_INODE:
438 llfuse.lock.acquire()
440 super(RecursiveInvalidateDirectory, self).invalidate()
441 for a in self._entries:
442 self._entries[a].invalidate()
443 except Exception as e:
446 if self.inode == llfuse.ROOT_INODE:
447 llfuse.lock.release()
450 class TagsDirectory(RecursiveInvalidateDirectory):
451 '''A special directory that contains as subdirectories all tags visible to the user.'''
453 def __init__(self, parent_inode, inodes, api, poll_time=60):
454 super(TagsDirectory, self).__init__(parent_inode)
458 self._poll_time = poll_time
461 with llfuse.lock_released:
462 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
464 self.merge(tags['items'],
465 lambda i: i['name'] if 'name' in i else i['uuid'],
466 lambda a, i: a.tag == i,
467 lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
470 class TagDirectory(Directory):
471 '''A special directory that contains as subdirectories all collections visible
472 to the user that are tagged with a particular tag.
475 def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
476 super(TagDirectory, self).__init__(parent_inode)
481 self._poll_time = poll_time
484 with llfuse.lock_released:
485 taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
486 ['name', '=', self.tag],
487 ['head_uuid', 'is_a', 'arvados#collection']],
488 select=['head_uuid']).execute()
489 self.merge(taggedcollections['items'],
490 lambda i: i['head_uuid'],
491 lambda a, i: a.collection_locator == i['head_uuid'],
492 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid']))
495 class ProjectDirectory(Directory):
496 '''A special directory that contains the contents of a project.'''
498 def __init__(self, parent_inode, inodes, api, project_object, poll=False, poll_time=60):
499 super(ProjectDirectory, self).__init__(parent_inode)
502 self.project_object = project_object
503 self.project_object_file = None
504 self.uuid = project_object['uuid']
506 def createDirectory(self, i):
507 if collection_uuid_pattern.match(i['uuid']):
508 return CollectionDirectory(self.inode, self.inodes, self.api, i)
509 elif group_uuid_pattern.match(i['uuid']):
510 return ProjectDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
511 elif link_uuid_pattern.match(i['uuid']):
512 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
513 return CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid'])
516 elif uuid_pattern.match(i['uuid']):
517 return ObjectFile(self.parent_inode, i)
522 if self.project_object_file == None:
523 self.project_object_file = ObjectFile(self.inode, self.project_object)
524 self.inodes.add_entry(self.project_object_file)
528 if i['name'] is None or len(i['name']) == 0:
530 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
531 # collection or subproject
533 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
536 elif 'kind' in i and i['kind'].startswith('arvados#'):
538 return "{}.{}".format(i['name'], i['kind'][8:])
543 if isinstance(a, CollectionDirectory):
544 return a.collection_locator == i['uuid']
545 elif isinstance(a, ProjectDirectory):
546 return a.uuid == i['uuid']
547 elif isinstance(a, ObjectFile):
548 return a.uuid == i['uuid'] and not a.stale()
551 with llfuse.lock_released:
552 if group_uuid_pattern.match(self.uuid):
553 self.project_object = self.api.groups().get(uuid=self.uuid).execute()
554 elif user_uuid_pattern.match(self.uuid):
555 self.project_object = self.api.users().get(uuid=self.uuid).execute()
557 contents = arvados.util.list_all(self.api.groups().contents, uuid=self.uuid)
558 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
559 contents += arvados.util.list_all(self.api.links().list, filters=[['tail_uuid', '=', self.uuid], ['link_class', '=', 'name']])
561 # end with llfuse.lock_released, re-acquire lock
566 self.createDirectory)
568 def __getitem__(self, item):
570 if item == '.arvados#project':
571 return self.project_object_file
573 return super(ProjectDirectory, self).__getitem__(item)
575 def __contains__(self, k):
576 if k == '.arvados#project':
579 return super(ProjectDirectory, self).__contains__(k)
582 class SharedDirectory(Directory):
583 '''A special directory that represents users or groups who have shared projects with me.'''
585 def __init__(self, parent_inode, inodes, api, exclude, poll=False, poll_time=60):
586 super(SharedDirectory, self).__init__(parent_inode)
587 self.current_user = api.users().current().execute()
591 self._poll_time = poll_time
594 with llfuse.lock_released:
595 all_projects = arvados.util.list_all(self.api.groups().list, filters=[['group_class','=','project']])
597 for ob in all_projects:
598 objects[ob['uuid']] = ob
602 for ob in all_projects:
603 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
605 root_owners[ob['owner_uuid']] = True
607 lusers = arvados.util.list_all(self.api.users().list, filters=[['uuid','in', list(root_owners)]])
608 lgroups = arvados.util.list_all(self.api.groups().list, filters=[['uuid','in', list(root_owners)]])
614 objects[l["uuid"]] = l
616 objects[l["uuid"]] = l
619 for r in root_owners:
623 contents[obr["name"]] = obr
624 if "first_name" in obr:
625 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
628 if r['owner_uuid'] not in objects:
629 contents[r['name']] = r
631 # end with llfuse.lock_released, re-acquire lock
634 self.merge(contents.items(),
636 lambda a, i: a.uuid == i[1]['uuid'],
637 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, i[1], poll=self._poll, poll_time=self._poll_time))
638 except Exception as e:
642 class FileHandle(object):
643 '''Connects a numeric file handle to a File or Directory object that has
644 been opened by the client.'''
646 def __init__(self, fh, entry):
651 class Inodes(object):
652 '''Manage the set of inodes. This is the mapping from a numeric id
653 to a concrete File or Directory object'''
657 self._counter = llfuse.ROOT_INODE
659 def __getitem__(self, item):
660 return self._entries[item]
662 def __setitem__(self, key, item):
663 self._entries[key] = item
666 return self._entries.iterkeys()
669 return self._entries.items()
671 def __contains__(self, k):
672 return k in self._entries
674 def add_entry(self, entry):
675 entry.inode = self._counter
676 self._entries[entry.inode] = entry
680 def del_entry(self, entry):
681 llfuse.invalidate_inode(entry.inode)
682 del self._entries[entry.inode]
684 class Operations(llfuse.Operations):
685 '''This is the main interface with llfuse. The methods on this object are
686 called by llfuse threads to service FUSE events to query and read from
689 llfuse has its own global lock which is acquired before calling a request handler,
690 so request handlers do not run concurrently unless the lock is explicitly released
691 using "with llfuse.lock_released:"'''
693 def __init__(self, uid, gid):
694 super(Operations, self).__init__()
696 self.inodes = Inodes()
700 # dict of inode to filehandle
701 self._filehandles = {}
702 self._filehandles_counter = 1
704 # Other threads that need to wait until the fuse driver
705 # is fully initialized should wait() on this event object.
706 self.initlock = threading.Event()
709 # Allow threads that are waiting for the driver to be finished
710 # initializing to continue
713 def access(self, inode, mode, ctx):
716 def getattr(self, inode):
717 if inode not in self.inodes:
718 raise llfuse.FUSEError(errno.ENOENT)
720 e = self.inodes[inode]
722 entry = llfuse.EntryAttributes()
725 entry.entry_timeout = 300
726 entry.attr_timeout = 300
728 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
729 if isinstance(e, Directory):
730 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
732 entry.st_mode |= stat.S_IFREG
735 entry.st_uid = self.uid
736 entry.st_gid = self.gid
739 entry.st_size = e.size()
741 entry.st_blksize = 512
742 entry.st_blocks = (e.size()/512)+1
743 entry.st_atime = int(e.atime())
744 entry.st_mtime = int(e.mtime())
745 entry.st_ctime = int(e.mtime())
749 def lookup(self, parent_inode, name):
750 _logger.debug("arv-mount lookup: parent_inode %i name %s",
757 if parent_inode in self.inodes:
758 p = self.inodes[parent_inode]
760 inode = p.parent_inode
761 elif isinstance(p, Directory) and name in p:
762 inode = p[name].inode
765 return self.getattr(inode)
767 raise llfuse.FUSEError(errno.ENOENT)
769 def open(self, inode, flags):
770 if inode in self.inodes:
771 p = self.inodes[inode]
773 raise llfuse.FUSEError(errno.ENOENT)
775 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
776 raise llfuse.FUSEError(errno.EROFS)
778 if isinstance(p, Directory):
779 raise llfuse.FUSEError(errno.EISDIR)
781 fh = self._filehandles_counter
782 self._filehandles_counter += 1
783 self._filehandles[fh] = FileHandle(fh, p)
786 def read(self, fh, off, size):
787 _logger.debug("arv-mount read %i %i %i", fh, off, size)
788 if fh in self._filehandles:
789 handle = self._filehandles[fh]
791 raise llfuse.FUSEError(errno.EBADF)
794 handle.entry._atime = time.time()
797 with llfuse.lock_released:
798 return handle.entry.readfrom(off, size)
800 raise llfuse.FUSEError(errno.EIO)
802 def release(self, fh):
803 if fh in self._filehandles:
804 del self._filehandles[fh]
806 def opendir(self, inode):
807 _logger.debug("arv-mount opendir: inode %i", inode)
809 if inode in self.inodes:
810 p = self.inodes[inode]
812 raise llfuse.FUSEError(errno.ENOENT)
814 if not isinstance(p, Directory):
815 raise llfuse.FUSEError(errno.ENOTDIR)
817 fh = self._filehandles_counter
818 self._filehandles_counter += 1
819 if p.parent_inode in self.inodes:
820 parent = self.inodes[p.parent_inode]
822 raise llfuse.FUSEError(errno.EIO)
825 p._atime = time.time()
827 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
830 def readdir(self, fh, off):
831 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
833 if fh in self._filehandles:
834 handle = self._filehandles[fh]
836 raise llfuse.FUSEError(errno.EBADF)
838 _logger.debug("arv-mount handle.entry %s", handle.entry)
841 while e < len(handle.entry):
842 if handle.entry[e][1].inode in self.inodes:
843 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
846 def releasedir(self, fh):
847 del self._filehandles[fh]
850 st = llfuse.StatvfsData()
851 st.f_bsize = 64 * 1024
864 # The llfuse documentation recommends only overloading functions that
865 # are actually implemented, as the default implementation will raise ENOSYS.
866 # However, there is a bug in the llfuse default implementation of create()
867 # "create() takes exactly 5 positional arguments (6 given)" which will crash
869 # The workaround is to implement it with the proper number of parameters,
870 # and then everything works out.
871 def create(self, p1, p2, p3, p4, p5):
872 raise llfuse.FUSEError(errno.EROFS)