2 # FUSE driver for Arvados Keep
8 from llfuse import FUSEError
23 _logger = logging.getLogger('arvados.arvados_fuse')
25 class SafeApi(object):
26 '''Threadsafe wrapper for API object. This stores and returns a different api
27 object per thread, because httplib2 which underlies apiclient is not
31 def __init__(self, config):
32 self.host = config.get('ARVADOS_API_HOST')
33 self.token = config.get('ARVADOS_API_TOKEN')
34 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
35 self.local = threading.local()
38 if 'api' not in self.local.__dict__:
39 self.local.api = arvados.api('v1', False, self.host, self.token, self.insecure)
42 def collections(self):
43 return self.localapi().collections()
46 return self.localapi().links()
49 return self.localapi().groups()
52 return self.localapi().users()
55 '''Parse Arvados timestamp to unix time.'''
56 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
58 def sanitize_filename(dirty):
59 '''Remove troublesome characters from filenames.'''
60 # http://www.dwheeler.com/essays/fixing-unix-linux-filenames.html
66 if (c >= '\x00' and c <= '\x1f') or c == '\x7f' or c == '/':
67 # skip control characters and /
71 # strip leading - or ~ and leading/trailing whitespace
72 stripped = fn.lstrip("-~ ").rstrip()
79 class FreshBase(object):
80 '''Base class for maintaining fresh/stale state to determine when to update.'''
84 self._last_update = time.time()
87 # Mark the value as stale
91 # Test if the entries dict is stale
96 return (self._last_update + self._poll_time) < time.time()
101 self._last_update = time.time()
110 class File(FreshBase):
111 '''Base for file objects.'''
113 def __init__(self, parent_inode, _ctime=0, _mtime=0):
114 super(File, self).__init__()
116 self.parent_inode = parent_inode
123 def readfrom(self, off, size):
133 class StreamReaderFile(File):
134 '''Wraps a StreamFileReader as a file.'''
136 def __init__(self, parent_inode, reader, _ctime, _mtime):
137 super(StreamReaderFile, self).__init__(parent_inode, _ctime, _mtime)
141 return self.reader.size()
143 def readfrom(self, off, size):
144 return self.reader.readfrom(off, size)
150 class StringFile(File):
151 '''Wrap a simple string as a file'''
152 def __init__(self, parent_inode, contents, _ctime, _mtime):
153 super(StringFile, self).__init__(parent_inode, _ctime, _mtime)
154 self.contents = contents
157 return len(self.contents)
159 def readfrom(self, off, size):
160 return self.contents[off:(off+size)]
163 class ObjectFile(StringFile):
164 '''Wrap a dict as a serialized json object.'''
166 def __init__(self, parent_inode, obj):
167 super(ObjectFile, self).__init__(parent_inode, "", 0, 0)
168 self.uuid = obj['uuid']
171 def update(self, obj):
172 self._ctime = convertTime(obj['created_at']) if 'created_at' in obj else 0
173 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
174 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
177 class Directory(FreshBase):
178 '''Generic directory object, backed by a dict.
179 Consists of a set of entries with the key representing the filename
180 and the value referencing a File or Directory object.
183 def __init__(self, parent_inode):
184 super(Directory, self).__init__()
186 '''parent_inode is the integer inode number'''
188 if not isinstance(parent_inode, int):
189 raise Exception("parent_inode should be an int")
190 self.parent_inode = parent_inode
193 # Overriden by subclasses to implement logic to update the entries dict
194 # when the directory is stale
198 # Only used when computing the size of the disk footprint of the directory
203 def checkupdate(self):
207 except apiclient.errors.HttpError as e:
210 def __getitem__(self, item):
212 return self._entries[item]
216 return self._entries.items()
220 return self._entries.iterkeys()
222 def __contains__(self, k):
224 return k in self._entries
226 def merge(self, items, fn, same, new_entry):
227 '''Helper method for updating the contents of the directory. Takes a list
228 describing the new contents of the directory, reuse entries that are
229 the same in both the old and new lists, create new entries, and delete
230 old entries missing from the new list.
232 items: iterable with new directory contents
234 fn: function to take an entry in 'items' and return the desired file or
235 directory name, or None if this entry should be skipped
237 same: function to compare an existing entry (a File or Directory
238 object) with an entry in the items list to determine whether to keep
241 new_entry: function to create a new directory entry (File or Directory
242 object) from an entry in the items list.
246 oldentries = self._entries
249 name = sanitize_filename(fn(i))
251 if name in oldentries and same(oldentries[name], i):
252 # move existing directory entry over
253 self._entries[name] = oldentries[name]
256 # create new directory entry
259 self._entries[name] = self.inodes.add_entry(ent)
261 # delete any other directory entries that were not in found in 'items'
263 llfuse.invalidate_entry(self.inode, str(i))
264 self.inodes.del_entry(oldentries[i])
268 '''Delete all entries'''
269 oldentries = self._entries
272 if isinstance(n, Directory):
274 llfuse.invalidate_entry(self.inode, str(n))
275 self.inodes.del_entry(oldentries[n])
279 class CollectionDirectory(Directory):
280 '''Represents the root of a directory tree holding a collection.'''
282 def __init__(self, parent_inode, inodes, api, collection):
283 super(CollectionDirectory, self).__init__(parent_inode)
286 self.collection_object_file = None
287 self.collection_object = None
288 if isinstance(collection, dict):
289 self.collection_locator = collection['uuid']
291 self.collection_locator = collection
294 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
296 def new_collection(self, new_collection_object):
297 self.collection_object = new_collection_object
299 if self.collection_object_file is not None:
300 self.collection_object_file.update(self.collection_object)
303 collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api)
304 for s in collection.all_streams():
306 for part in s.name().split('/'):
307 if part != '' and part != '.':
308 partname = sanitize_filename(part)
309 if partname not in cwd._entries:
310 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
311 cwd = cwd._entries[partname]
312 for k, v in s.files().items():
313 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.ctime(), self.mtime()))
317 if self.collection_object is not None and re.match(r'^[a-f0-9]{32}', self.collection_locator):
320 with llfuse.lock_released:
321 new_collection_object = self.api.collections().get(uuid=self.collection_locator).execute()
322 if "portable_data_hash" not in new_collection_object:
323 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
324 # end with llfuse.lock_released, re-acquire lock
326 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
327 self.new_collection(new_collection_object)
331 except apiclient.errors.HttpError as e:
332 if e.resp.status == 404:
333 _logger.warn("arv-mount %s: not found", self.collection_locator)
335 _logger.error("arv-mount %s: error", self.collection_locator)
336 _logger.exception(detail)
337 except Exception as detail:
338 _logger.error("arv-mount %s: error", self.collection_locator)
339 if "manifest_text" in self.collection_object:
340 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
341 _logger.exception(detail)
344 def __getitem__(self, item):
346 if item == '.arvados#collection':
347 if self.collection_object_file is None:
348 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
349 self.inodes.add_entry(self.collection_object_file)
350 return self.collection_object_file
352 return super(CollectionDirectory, self).__getitem__(item)
354 def __contains__(self, k):
355 if k == '.arvados#collection':
358 return super(CollectionDirectory, self).__contains__(k)
362 return convertTime(self.collection_object["created_at"]) if self.collection_object is not None else 0
366 return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None else 0
369 class MagicDirectory(Directory):
370 '''A special directory that logically contains the set of all extant keep
371 locators. When a file is referenced by lookup(), it is tested to see if it
372 is a valid keep locator to a manifest, and if so, loads the manifest
373 contents as a subdirectory of this directory with the locator as the
374 directory name. Since querying a list of all extant keep locators is
375 impractical, only collections that have already been accessed are visible
379 def __init__(self, parent_inode, inodes, api):
380 super(MagicDirectory, self).__init__(parent_inode)
384 def __contains__(self, k):
385 if k in self._entries:
388 e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, self.api, k))
394 except Exception as e:
395 _logger.debug('arv-mount exception keep %s', e)
398 def __getitem__(self, item):
400 return self._entries[item]
402 raise KeyError("No collection with id " + item)
405 class RecursiveInvalidateDirectory(Directory):
406 def invalidate(self):
407 if self.inode == llfuse.ROOT_INODE:
408 llfuse.lock.acquire()
410 super(RecursiveInvalidateDirectory, self).invalidate()
411 for a in self._entries:
412 self._entries[a].invalidate()
413 except Exception as e:
416 if self.inode == llfuse.ROOT_INODE:
417 llfuse.lock.release()
420 class TagsDirectory(RecursiveInvalidateDirectory):
421 '''A special directory that contains as subdirectories all tags visible to the user.'''
423 def __init__(self, parent_inode, inodes, api, poll_time=60):
424 super(TagsDirectory, self).__init__(parent_inode)
428 # arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
431 self._poll_time = poll_time
434 with llfuse.lock_released:
435 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
437 self.merge(tags['items'],
438 lambda i: i['name'] if 'name' in i else i['uuid'],
439 lambda a, i: a.tag == i,
440 lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
443 class TagDirectory(Directory):
444 '''A special directory that contains as subdirectories all collections visible
445 to the user that are tagged with a particular tag.
448 def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
449 super(TagDirectory, self).__init__(parent_inode)
454 self._poll_time = poll_time
457 with llfuse.lock_released:
458 taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
459 ['name', '=', self.tag],
460 ['head_uuid', 'is_a', 'arvados#collection']],
461 select=['head_uuid']).execute()
462 self.merge(taggedcollections['items'],
463 lambda i: i['head_uuid'],
464 lambda a, i: a.collection_locator == i['head_uuid'],
465 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid']))
468 class ProjectDirectory(RecursiveInvalidateDirectory):
469 '''A special directory that contains the contents of a project.'''
471 def __init__(self, parent_inode, inodes, api, project_object, poll=False, poll_time=60):
472 super(ProjectDirectory, self).__init__(parent_inode)
475 self.project_object = project_object
476 self.project_object_file = ObjectFile(self.inode, self.project_object)
477 self.inodes.add_entry(self.project_object_file)
478 self.uuid = project_object['uuid']
480 def createDirectory(self, i):
481 if re.match(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}', i['uuid']):
482 return CollectionDirectory(self.inode, self.inodes, self.api, i)
483 elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
484 return ProjectDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
485 elif re.match(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}', i['uuid']):
486 if i['head_kind'] == 'arvados#collection' or re.match('[0-9a-f]{32}\+\d+', i['head_uuid']):
487 return CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid'])
490 #elif re.match(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}', i['uuid']):
492 elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
493 return ObjectFile(self.parent_inode, i)
500 if i['name'] is None or len(i['name']) == 0:
502 elif re.match(r'[a-z0-9]{5}-(4zz18|j7d0g)-[a-z0-9]{15}', i['uuid']):
503 # collection or subproject
505 elif re.match(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}', i['uuid']) and i['head_kind'] == 'arvados#collection':
508 elif 'kind' in i and i['kind'].startswith('arvados#'):
510 return "{}.{}".format(i['name'], i['kind'][8:])
515 if isinstance(a, CollectionDirectory):
516 return a.collection_locator == i['uuid']
517 elif isinstance(a, ProjectDirectory):
518 return a.uuid == i['uuid']
519 elif isinstance(a, ObjectFile):
520 return a.uuid == i['uuid'] and not a.stale()
523 with llfuse.lock_released:
524 if re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', self.uuid):
525 self.project_object = self.api.groups().get(uuid=self.uuid).execute()
526 elif re.match(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}', self.uuid):
527 self.project_object = self.api.users().get(uuid=self.uuid).execute()
529 contents = arvados.util.list_all(self.api.groups().contents, uuid=self.uuid)
530 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
531 contents += arvados.util.list_all(self.api.links().list, filters=[['tail_uuid', '=', self.uuid], ['link_class', '=', 'name']])
533 # end with llfuse.lock_released, re-acquire lock
538 self.createDirectory)
540 def __getitem__(self, item):
542 if item == '.arvados#project':
543 return self.project_object_file
545 return super(ProjectDirectory, self).__getitem__(item)
547 def __contains__(self, k):
548 if k == '.arvados#project':
551 return super(ProjectDirectory, self).__contains__(k)
554 return convertTime(self.project_object["created_at"]) if "created_at" in self.project_object else 0
557 return convertTime(self.project_object["modified_at"]) if "modified_at" in self.project_object else 0
560 class SharedDirectory(RecursiveInvalidateDirectory):
561 '''A special directory that represents users or groups who have shared projects with me.'''
563 def __init__(self, parent_inode, inodes, api, exclude, poll=False, poll_time=60):
564 super(SharedDirectory, self).__init__(parent_inode)
565 self.current_user = api.users().current().execute()
570 # arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
573 self._poll_time = poll_time
576 with llfuse.lock_released:
577 all_projects = arvados.util.list_all(self.api.groups().list, filters=[['group_class','=','project']])
579 for ob in all_projects:
580 objects[ob['uuid']] = ob
584 for ob in all_projects:
585 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
587 root_owners[ob['owner_uuid']] = True
589 lusers = arvados.util.list_all(self.api.users().list, filters=[['uuid','in', list(root_owners)]])
590 lgroups = arvados.util.list_all(self.api.groups().list, filters=[['uuid','in', list(root_owners)]])
596 objects[l["uuid"]] = l
598 objects[l["uuid"]] = l
601 for r in root_owners:
605 contents[obr["name"]] = obr
606 if "first_name" in obr:
607 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
610 if r['owner_uuid'] not in objects:
611 contents[r['name']] = r
613 # end with llfuse.lock_released, re-acquire lock
616 self.merge(contents.items(),
618 lambda a, i: a.uuid == i[1]['uuid'],
619 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, i[1], poll=self._poll, poll_time=self._poll_time))
620 except Exception as e:
624 class FileHandle(object):
625 '''Connects a numeric file handle to a File or Directory object that has
626 been opened by the client.'''
628 def __init__(self, fh, entry):
633 class Inodes(object):
634 '''Manage the set of inodes. This is the mapping from a numeric id
635 to a concrete File or Directory object'''
639 self._counter = llfuse.ROOT_INODE
641 def __getitem__(self, item):
642 return self._entries[item]
644 def __setitem__(self, key, item):
645 self._entries[key] = item
648 return self._entries.iterkeys()
651 return self._entries.items()
653 def __contains__(self, k):
654 return k in self._entries
656 def add_entry(self, entry):
657 entry.inode = self._counter
658 self._entries[entry.inode] = entry
662 def del_entry(self, entry):
663 llfuse.invalidate_inode(entry.inode)
664 del self._entries[entry.inode]
666 class Operations(llfuse.Operations):
667 '''This is the main interface with llfuse. The methods on this object are
668 called by llfuse threads to service FUSE events to query and read from
671 llfuse has its own global lock which is acquired before calling a request handler,
672 so request handlers do not run concurrently unless the lock is explicitly released
673 using "with llfuse.lock_released:"'''
675 def __init__(self, uid, gid):
676 super(Operations, self).__init__()
678 self.inodes = Inodes()
682 # dict of inode to filehandle
683 self._filehandles = {}
684 self._filehandles_counter = 1
686 # Other threads that need to wait until the fuse driver
687 # is fully initialized should wait() on this event object.
688 self.initlock = threading.Event()
691 # Allow threads that are waiting for the driver to be finished
692 # initializing to continue
695 def access(self, inode, mode, ctx):
698 def getattr(self, inode):
699 if inode not in self.inodes:
700 raise llfuse.FUSEError(errno.ENOENT)
702 e = self.inodes[inode]
704 entry = llfuse.EntryAttributes()
707 entry.entry_timeout = 300
708 entry.attr_timeout = 300
710 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
711 if isinstance(e, Directory):
712 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
714 entry.st_mode |= stat.S_IFREG
717 entry.st_uid = self.uid
718 entry.st_gid = self.gid
721 entry.st_size = e.size()
723 entry.st_blksize = 512
724 entry.st_blocks = (e.size()/512)
725 if e.size()/512 != 0:
728 entry.st_mtime = e.mtime()
729 entry.st_ctime = e.ctime()
733 def lookup(self, parent_inode, name):
734 _logger.debug("arv-mount lookup: parent_inode %i name %s",
741 if parent_inode in self.inodes:
742 p = self.inodes[parent_inode]
744 inode = p.parent_inode
746 inode = p[name].inode
749 return self.getattr(inode)
751 raise llfuse.FUSEError(errno.ENOENT)
753 def open(self, inode, flags):
754 if inode in self.inodes:
755 p = self.inodes[inode]
757 raise llfuse.FUSEError(errno.ENOENT)
759 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
760 raise llfuse.FUSEError(errno.EROFS)
762 if isinstance(p, Directory):
763 raise llfuse.FUSEError(errno.EISDIR)
765 fh = self._filehandles_counter
766 self._filehandles_counter += 1
767 self._filehandles[fh] = FileHandle(fh, p)
770 def read(self, fh, off, size):
771 _logger.debug("arv-mount read %i %i %i", fh, off, size)
772 if fh in self._filehandles:
773 handle = self._filehandles[fh]
775 raise llfuse.FUSEError(errno.EBADF)
778 with llfuse.lock_released:
779 return handle.entry.readfrom(off, size)
781 raise llfuse.FUSEError(errno.EIO)
783 def release(self, fh):
784 if fh in self._filehandles:
785 del self._filehandles[fh]
787 def opendir(self, inode):
788 _logger.debug("arv-mount opendir: inode %i", inode)
790 if inode in self.inodes:
791 p = self.inodes[inode]
793 raise llfuse.FUSEError(errno.ENOENT)
795 if not isinstance(p, Directory):
796 raise llfuse.FUSEError(errno.ENOTDIR)
798 fh = self._filehandles_counter
799 self._filehandles_counter += 1
800 if p.parent_inode in self.inodes:
801 parent = self.inodes[p.parent_inode]
803 raise llfuse.FUSEError(errno.EIO)
805 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
808 def readdir(self, fh, off):
809 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
811 if fh in self._filehandles:
812 handle = self._filehandles[fh]
814 raise llfuse.FUSEError(errno.EBADF)
816 _logger.debug("arv-mount handle.entry %s", handle.entry)
819 while e < len(handle.entry):
820 if handle.entry[e][1].inode in self.inodes:
821 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
824 def releasedir(self, fh):
825 del self._filehandles[fh]
828 st = llfuse.StatvfsData()
829 st.f_bsize = 64 * 1024
842 # The llfuse documentation recommends only overloading functions that
843 # are actually implemented, as the default implementation will raise ENOSYS.
844 # However, there is a bug in the llfuse default implementation of create()
845 # "create() takes exactly 5 positional arguments (6 given)" which will crash
847 # The workaround is to implement it with the proper number of parameters,
848 # and then everything works out.
849 def create(self, p1, p2, p3, p4, p5):
850 raise llfuse.FUSEError(errno.EROFS)