2 # FUSE driver for Arvados Keep
8 from llfuse import FUSEError
23 _logger = logging.getLogger('arvados.arvados_fuse')
25 class SafeApi(object):
26 '''Threadsafe wrapper for API object. This stores and returns a different api
27 object per thread, because httplib2 which underlies apiclient is not
31 def __init__(self, config):
32 self.host = config.get('ARVADOS_API_HOST')
33 self.token = config.get('ARVADOS_API_TOKEN')
34 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
35 self.local = threading.local()
38 if 'api' not in self.local.__dict__:
39 self.local.api = arvados.api('v1', False, self.host, self.token, self.insecure)
42 def collections(self):
43 return self.localapi().collections()
46 return self.localapi().links()
49 return self.localapi().groups()
52 return self.localapi().users()
55 '''Parse Arvados timestamp to unix time.'''
56 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
58 def sanitize_filename(dirty):
59 '''Remove troublesome characters from filenames.'''
60 # http://www.dwheeler.com/essays/fixing-unix-linux-filenames.html
66 if (c >= '\x00' and c <= '\x1f') or c == '\x7f' or c == '/':
67 # skip control characters and /
71 # strip leading - or ~ and leading/trailing whitespace
72 stripped = fn.lstrip("-~ ").rstrip()
79 class FreshBase(object):
80 '''Base class for maintaining fresh/stale state to determine when to update.'''
84 self._last_update = time.time()
87 # Mark the value as stale
91 # Test if the entries dict is stale
96 return (self._last_update + self._poll_time) < time.time()
101 self._last_update = time.time()
110 class File(FreshBase):
111 '''Base for file objects.'''
113 def __init__(self, parent_inode, _ctime=0, _mtime=0):
114 super(File, self).__init__()
116 self.parent_inode = parent_inode
123 def readfrom(self, off, size):
133 class StreamReaderFile(File):
134 '''Wraps a StreamFileReader as a file.'''
136 def __init__(self, parent_inode, reader, _ctime, _mtime):
137 super(StreamReaderFile, self).__init__(parent_inode, _ctime, _mtime)
141 return self.reader.size()
143 def readfrom(self, off, size):
144 return self.reader.readfrom(off, size)
150 class StringFile(File):
151 '''Wrap a simple string as a file'''
152 def __init__(self, parent_inode, contents, _ctime, _mtime):
153 super(StringFile, self).__init__(parent_inode, _ctime, _mtime)
154 self.contents = contents
157 return len(self.contents)
159 def readfrom(self, off, size):
160 return self.contents[off:(off+size)]
162 class ObjectFile(StringFile):
163 '''Wrap a dict as a serialized json object.'''
165 def __init__(self, parent_inode, contents):
166 _ctime = convertTime(contents['created_at']) if 'created_at' in contents else 0
167 _mtime = convertTime(contents['modified_at']) if 'modified_at' in contents else 0
168 super(ObjectFile, self).__init__(parent_inode, json.dumps(contents, indent=4, sort_keys=True)+"\n", _ctime, _mtime)
169 self.contentsdict = contents
170 self.uuid = self.contentsdict['uuid']
173 class Directory(FreshBase):
174 '''Generic directory object, backed by a dict.
175 Consists of a set of entries with the key representing the filename
176 and the value referencing a File or Directory object.
179 def __init__(self, parent_inode):
180 super(Directory, self).__init__()
182 '''parent_inode is the integer inode number'''
184 if not isinstance(parent_inode, int):
185 raise Exception("parent_inode should be an int")
186 self.parent_inode = parent_inode
189 # Overriden by subclasses to implement logic to update the entries dict
190 # when the directory is stale
194 # Only used when computing the size of the disk footprint of the directory
199 def checkupdate(self):
203 except apiclient.errors.HttpError as e:
206 def __getitem__(self, item):
208 return self._entries[item]
212 return self._entries.items()
216 return self._entries.iterkeys()
218 def __contains__(self, k):
220 return k in self._entries
222 def merge(self, items, fn, same, new_entry):
223 '''Helper method for updating the contents of the directory. Takes a list
224 describing the new contents of the directory, reuse entries that are
225 the same in both the old and new lists, create new entries, and delete
226 old entries missing from the new list.
228 items: iterable with new directory contents
230 fn: function to take an entry in 'items' and return the desired file or
231 directory name, or None if this entry should be skipped
233 same: function to compare an existing entry (a File or Directory
234 object) with an entry in the items list to determine whether to keep
237 new_entry: function to create a new directory entry (File or Directory
238 object) from an entry in the items list.
242 oldentries = self._entries
245 name = sanitize_filename(fn(i))
247 if name in oldentries and same(oldentries[name], i):
248 # move existing directory entry over
249 self._entries[name] = oldentries[name]
252 # create new directory entry
255 self._entries[name] = self.inodes.add_entry(ent)
257 # delete any other directory entries that were not in found in 'items'
259 llfuse.invalidate_entry(self.inode, str(i))
260 self.inodes.del_entry(oldentries[i])
264 '''Delete all entries'''
265 oldentries = self._entries
268 if isinstance(n, Directory):
270 llfuse.invalidate_entry(self.inode, str(n))
271 self.inodes.del_entry(oldentries[n])
275 class CollectionDirectory(Directory):
276 '''Represents the root of a directory tree holding a collection.'''
278 def __init__(self, parent_inode, inodes, api, collection_locator):
279 super(CollectionDirectory, self).__init__(parent_inode)
282 self.collection_locator = collection_locator
283 self.manifest_text_file = None
285 self.collection_object = None
288 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
292 if self.collection_object is not None and re.match(r'^[a-f0-9]{32}', self.collection_locator):
295 with llfuse.lock_released:
296 new_collection_object = self.api.collections().get(uuid=self.collection_locator).execute()
297 if "portable_data_hash" not in new_collection_object:
298 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
299 # end with llfuse.lock_released, re-acquire lock
301 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
302 self.collection_object = new_collection_object
304 if self.manifest_text_file is not None:
305 self.manifest_text_file.contents = self.collection_object["manifest_text"]
306 self.manifest_text_file._ctime = self.ctime()
307 self.manifest_text_file._mtime = self.mtime()
308 if self.pdh_file is not None:
309 self.pdh_file.contents = self.collection_object["portable_data_hash"]
310 self.pdh_file._ctime = self.ctime()
311 self.pdh_file._mtime = self.mtime()
314 collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api)
315 for s in collection.all_streams():
317 for part in s.name().split('/'):
318 if part != '' and part != '.':
319 partname = sanitize_filename(part)
320 if partname not in cwd._entries:
321 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
322 cwd = cwd._entries[partname]
323 for k, v in s.files().items():
324 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.ctime(), self.mtime()))
327 except apiclient.errors.HttpError as e:
329 pprint.pprint(self.resp.status)
330 if self.resp.status == 404:
331 _logger.warn("arv-mount %s: not found", self.collection_locator)
333 _logger.error("arv-mount %s: error", self.collection_locator)
334 _logger.exception(detail)
336 except Exception as detail:
337 _logger.error("arv-mount %s: error", self.collection_locator)
338 if "manifest_text" in self.collection_object:
339 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
340 _logger.exception(detail)
343 def __getitem__(self, item):
345 if item == '.arvados#collection.manifest_text':
346 if self.manifest_text_file is None:
347 self.manifest_text_file = StringFile(self.inode, self.collection_object["manifest_text"], self.ctime(), self.mtime())
348 self.inodes.add_entry(self.manifest_text_file)
349 return self.manifest_text_file
350 elif item == '.arvados#collection.portable_data_hash':
351 if self.pdh_file is None:
352 self.pdh_file = StringFile(self.inode, self.collection_object["portable_data_hash"], self.ctime(), self.mtime())
353 self.inodes.add_entry(self.pdh_file)
356 return super(CollectionDirectory, self).__getitem__(item)
358 def __contains__(self, k):
359 if k in ('.arvados#collection.manifest_text', '.arvados#collection.portable_data_hash'):
362 return super(CollectionDirectory, self).__contains__(k)
366 return convertTime(self.collection_object["created_at"])
370 return convertTime(self.collection_object["modified_at"])
372 class MagicDirectory(Directory):
373 '''A special directory that logically contains the set of all extant keep
374 locators. When a file is referenced by lookup(), it is tested to see if it
375 is a valid keep locator to a manifest, and if so, loads the manifest
376 contents as a subdirectory of this directory with the locator as the
377 directory name. Since querying a list of all extant keep locators is
378 impractical, only collections that have already been accessed are visible
382 def __init__(self, parent_inode, inodes, api):
383 super(MagicDirectory, self).__init__(parent_inode)
387 def __contains__(self, k):
388 if k in self._entries:
391 e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, self.api, k))
397 except Exception as e:
398 _logger.debug('arv-mount exception keep %s', e)
401 def __getitem__(self, item):
403 return self._entries[item]
405 raise KeyError("No collection with id " + item)
407 class RecursiveInvalidateDirectory(Directory):
408 def invalidate(self):
409 if self.inode == llfuse.ROOT_INODE:
410 llfuse.lock.acquire()
412 super(RecursiveInvalidateDirectory, self).invalidate()
413 for a in self._entries:
414 self._entries[a].invalidate()
415 except Exception as e:
418 if self.inode == llfuse.ROOT_INODE:
419 llfuse.lock.release()
421 class TagsDirectory(RecursiveInvalidateDirectory):
422 '''A special directory that contains as subdirectories all tags visible to the user.'''
424 def __init__(self, parent_inode, inodes, api, poll_time=60):
425 super(TagsDirectory, self).__init__(parent_inode)
429 # arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
432 self._poll_time = poll_time
435 with llfuse.lock_released:
436 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
438 self.merge(tags['items'],
439 lambda i: i['name'] if 'name' in i else i['uuid'],
440 lambda a, i: a.tag == i,
441 lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
443 class TagDirectory(Directory):
444 '''A special directory that contains as subdirectories all collections visible
445 to the user that are tagged with a particular tag.
448 def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
449 super(TagDirectory, self).__init__(parent_inode)
454 self._poll_time = poll_time
457 with llfuse.lock_released:
458 taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
459 ['name', '=', self.tag],
460 ['head_uuid', 'is_a', 'arvados#collection']],
461 select=['head_uuid']).execute()
462 self.merge(taggedcollections['items'],
463 lambda i: i['head_uuid'],
464 lambda a, i: a.collection_locator == i['head_uuid'],
465 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid']))
468 class ProjectDirectory(RecursiveInvalidateDirectory):
469 '''A special directory that contains the contents of a project.'''
471 def __init__(self, parent_inode, inodes, api, project_object, poll=False, poll_time=60):
472 super(ProjectDirectory, self).__init__(parent_inode)
475 self.project_object = project_object
476 self.uuid = project_object['uuid']
478 def createDirectory(self, i):
479 if re.match(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}', i['uuid']):
480 return CollectionDirectory(self.inode, self.inodes, self.api, i['uuid'])
481 elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
482 return ProjectDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
483 elif re.match(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}', i['uuid']) and i['head_kind'] == 'arvados#collection':
484 return CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid'])
485 #elif re.match(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}', i['uuid']):
487 elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
488 return ObjectFile(self.parent_inode, i)
495 if i['name'] is None:
497 elif re.match(r'[a-z0-9]{5}-(4zz18|j7d0g)-[a-z0-9]{15}', i['uuid']):
498 # collection or subproject
500 elif re.match(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}', i['uuid']) and i['head_kind'] == 'arvados#collection':
503 elif 'kind' in i and i['kind'].startswith('arvados#'):
505 return "{}.{}".format(i['name'], i['kind'][8:])
510 if isinstance(a, CollectionDirectory):
511 return a.collection_locator == i['uuid']
512 elif isinstance(a, ProjectDirectory):
513 return a.uuid == i['uuid']
514 elif isinstance(a, ObjectFile):
515 return a.uuid == i['uuid'] and not a.stale()
518 with llfuse.lock_released:
519 if re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', self.uuid):
520 self.project_object = self.api.groups().get(uuid=self.uuid).execute()
521 elif re.match(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}', self.uuid):
522 self.project_object = self.api.users().get(uuid=self.uuid).execute()
524 contents = arvados.util.list_all(self.api.groups().contents, uuid=self.uuid)
525 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
526 contents += arvados.util.list_all(self.api.links().list, filters=[['tail_uuid', '=', self.uuid], ['link_class', '=', 'name']])
528 # end with llfuse.lock_released, re-acquire lock
533 self.createDirectory)
536 return convertTime(self.project_object["created_at"]) if "created_at" in self.project_object else 0
539 return convertTime(self.project_object["modified_at"]) if "modified_at" in self.project_object else 0
543 class SharedDirectory(RecursiveInvalidateDirectory):
544 '''A special directory that represents users or groups who have shared projects with me.'''
546 def __init__(self, parent_inode, inodes, api, exclude, poll=False, poll_time=60):
547 super(SharedDirectory, self).__init__(parent_inode)
548 self.current_user = api.users().current().execute()
553 # arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
556 self._poll_time = poll_time
559 with llfuse.lock_released:
560 all_projects = arvados.util.list_all(self.api.groups().list, filters=[['group_class','=','project']])
562 for ob in all_projects:
563 objects[ob['uuid']] = ob
567 for ob in all_projects:
568 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
570 root_owners[ob['owner_uuid']] = True
572 lusers = arvados.util.list_all(self.api.users().list, filters=[['uuid','in', list(root_owners)]])
573 lgroups = arvados.util.list_all(self.api.groups().list, filters=[['uuid','in', list(root_owners)]])
579 objects[l["uuid"]] = l
581 objects[l["uuid"]] = l
584 for r in root_owners:
588 contents[obr["name"]] = obr
589 if "first_name" in obr:
590 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
593 if r['owner_uuid'] not in objects:
594 contents[r['name']] = r
596 # end with llfuse.lock_released, re-acquire lock
599 self.merge(contents.items(),
601 lambda a, i: a.uuid == i[1]['uuid'],
602 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, i[1], poll=self._poll, poll_time=self._poll_time))
603 except Exception as e:
607 class FileHandle(object):
608 '''Connects a numeric file handle to a File or Directory object that has
609 been opened by the client.'''
611 def __init__(self, fh, entry):
616 class Inodes(object):
617 '''Manage the set of inodes. This is the mapping from a numeric id
618 to a concrete File or Directory object'''
622 self._counter = llfuse.ROOT_INODE
624 def __getitem__(self, item):
625 return self._entries[item]
627 def __setitem__(self, key, item):
628 self._entries[key] = item
631 return self._entries.iterkeys()
634 return self._entries.items()
636 def __contains__(self, k):
637 return k in self._entries
639 def add_entry(self, entry):
640 entry.inode = self._counter
641 self._entries[entry.inode] = entry
645 def del_entry(self, entry):
646 llfuse.invalidate_inode(entry.inode)
647 del self._entries[entry.inode]
649 class Operations(llfuse.Operations):
650 '''This is the main interface with llfuse. The methods on this object are
651 called by llfuse threads to service FUSE events to query and read from
654 llfuse has its own global lock which is acquired before calling a request handler,
655 so request handlers do not run concurrently unless the lock is explicitly released
656 using "with llfuse.lock_released:"'''
658 def __init__(self, uid, gid):
659 super(Operations, self).__init__()
661 self.inodes = Inodes()
665 # dict of inode to filehandle
666 self._filehandles = {}
667 self._filehandles_counter = 1
669 # Other threads that need to wait until the fuse driver
670 # is fully initialized should wait() on this event object.
671 self.initlock = threading.Event()
674 # Allow threads that are waiting for the driver to be finished
675 # initializing to continue
678 def access(self, inode, mode, ctx):
681 def getattr(self, inode):
682 if inode not in self.inodes:
683 raise llfuse.FUSEError(errno.ENOENT)
685 e = self.inodes[inode]
687 entry = llfuse.EntryAttributes()
690 entry.entry_timeout = 300
691 entry.attr_timeout = 300
693 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
694 if isinstance(e, Directory):
695 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
697 entry.st_mode |= stat.S_IFREG
700 entry.st_uid = self.uid
701 entry.st_gid = self.gid
704 entry.st_size = e.size()
706 entry.st_blksize = 512
707 entry.st_blocks = (e.size()/512)
708 if e.size()/512 != 0:
711 entry.st_mtime = e.mtime()
712 entry.st_ctime = e.ctime()
716 def lookup(self, parent_inode, name):
717 _logger.debug("arv-mount lookup: parent_inode %i name %s",
724 if parent_inode in self.inodes:
725 p = self.inodes[parent_inode]
727 inode = p.parent_inode
729 inode = p[name].inode
732 return self.getattr(inode)
734 raise llfuse.FUSEError(errno.ENOENT)
736 def open(self, inode, flags):
737 if inode in self.inodes:
738 p = self.inodes[inode]
740 raise llfuse.FUSEError(errno.ENOENT)
742 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
743 raise llfuse.FUSEError(errno.EROFS)
745 if isinstance(p, Directory):
746 raise llfuse.FUSEError(errno.EISDIR)
748 fh = self._filehandles_counter
749 self._filehandles_counter += 1
750 self._filehandles[fh] = FileHandle(fh, p)
753 def read(self, fh, off, size):
754 _logger.debug("arv-mount read %i %i %i", fh, off, size)
755 if fh in self._filehandles:
756 handle = self._filehandles[fh]
758 raise llfuse.FUSEError(errno.EBADF)
761 with llfuse.lock_released:
762 return handle.entry.readfrom(off, size)
764 raise llfuse.FUSEError(errno.EIO)
766 def release(self, fh):
767 if fh in self._filehandles:
768 del self._filehandles[fh]
770 def opendir(self, inode):
771 _logger.debug("arv-mount opendir: inode %i", inode)
773 if inode in self.inodes:
774 p = self.inodes[inode]
776 raise llfuse.FUSEError(errno.ENOENT)
778 if not isinstance(p, Directory):
779 raise llfuse.FUSEError(errno.ENOTDIR)
781 fh = self._filehandles_counter
782 self._filehandles_counter += 1
783 if p.parent_inode in self.inodes:
784 parent = self.inodes[p.parent_inode]
786 raise llfuse.FUSEError(errno.EIO)
788 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
791 def readdir(self, fh, off):
792 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
794 if fh in self._filehandles:
795 handle = self._filehandles[fh]
797 raise llfuse.FUSEError(errno.EBADF)
799 _logger.debug("arv-mount handle.entry %s", handle.entry)
802 while e < len(handle.entry):
803 if handle.entry[e][1].inode in self.inodes:
804 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
807 def releasedir(self, fh):
808 del self._filehandles[fh]
811 st = llfuse.StatvfsData()
812 st.f_bsize = 64 * 1024
825 # The llfuse documentation recommends only overloading functions that
826 # are actually implemented, as the default implementation will raise ENOSYS.
827 # However, there is a bug in the llfuse default implementation of create()
828 # "create() takes exactly 5 positional arguments (6 given)" which will crash
830 # The workaround is to implement it with the proper number of parameters,
831 # and then everything works out.
832 def create(self, p1, p2, p3, p4, p5):
833 raise llfuse.FUSEError(errno.EROFS)