2 # FUSE driver for Arvados Keep
8 from llfuse import FUSEError
22 _logger = logging.getLogger('arvados.arvados_fuse')
25 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
27 def sanitize_filename(dirty):
28 # http://www.dwheeler.com/essays/fixing-unix-linux-filenames.html
34 if (c >= '\x00' and c <= '\x1f') or c == '\x7f' or c == '/':
35 # skip control characters and /
39 # strip leading - or ~ and leading/trailing whitespace
40 stripped = fn.lstrip("-~ ").rstrip()
47 class FreshBase(object):
48 '''Base class for maintaining fresh/stale state to determine when to update.'''
52 self._last_update = time.time()
55 # Mark the value as stale
59 # Test if the entries dict is stale
64 return (self._last_update + self._poll_time) < time.time()
69 self._last_update = time.time()
78 class File(FreshBase):
79 '''Base for file objects.'''
81 def __init__(self, parent_inode, _ctime=0, _mtime=0):
82 super(File, self).__init__()
84 self.parent_inode = parent_inode
91 def readfrom(self, off, size):
101 class StreamReaderFile(File):
102 '''Wraps a StreamFileReader as a file.'''
104 def __init__(self, parent_inode, reader, _ctime, _mtime):
105 super(StreamReaderFile, self).__init__(parent_inode, _ctime, _mtime)
109 return self.reader.size()
111 def readfrom(self, off, size):
112 return self.reader.readfrom(off, size)
118 class StringFile(File):
119 '''Wrap a simple string as a file'''
120 def __init__(self, parent_inode, contents, _ctime, _mtime):
121 super(StringFile, self).__init__(parent_inode, _ctime, _mtime)
122 self.contents = contents
125 return len(self.contents)
127 def readfrom(self, off, size):
128 return self.contents[off:(off+size)]
130 class ObjectFile(StringFile):
131 '''Wrap a dict as a serialized json object.'''
133 def __init__(self, parent_inode, contents):
134 _ctime = convertTime(contents['created_at']) if 'created_at' in contents else 0
135 _mtime = convertTime(contents['modified_at']) if 'modified_at' in contents else 0
136 super(ObjectFile, self).__init__(parent_inode, json.dumps(contents, indent=4, sort_keys=True)+"\n", _ctime, _mtime)
137 self.contentsdict = contents
138 self.uuid = self.contentsdict['uuid']
141 class Directory(FreshBase):
142 '''Generic directory object, backed by a dict.
143 Consists of a set of entries with the key representing the filename
144 and the value referencing a File or Directory object.
147 def __init__(self, parent_inode):
148 super(Directory, self).__init__()
150 '''parent_inode is the integer inode number'''
152 if not isinstance(parent_inode, int):
153 raise Exception("parent_inode should be an int")
154 self.parent_inode = parent_inode
157 # Overriden by subclasses to implement logic to update the entries dict
158 # when the directory is stale
162 # Only used when computing the size of the disk footprint of the directory
167 def checkupdate(self):
171 except apiclient.errors.HttpError as e:
174 def __getitem__(self, item):
176 return self._entries[item]
180 return self._entries.items()
184 return self._entries.iterkeys()
186 def __contains__(self, k):
188 return k in self._entries
190 def merge(self, items, fn, same, new_entry):
191 '''Helper method for updating the contents of the directory. Takes a list
192 describing the new contents of the directory, reuse entries that are
193 the same in both the old and new lists, create new entries, and delete
194 old entries missing from the new list.
196 items: iterable with new directory contents
198 fn: function to take an entry in 'items' and return the desired file or
199 directory name, or None if this entry should be skipped
201 same: function to compare an existing entry (a File or Directory
202 object) with an entry in the items list to determine whether to keep
205 new_entry: function to create a new directory entry (File or Directory
206 object) from an entry in the items list.
210 oldentries = self._entries
213 name = sanitize_filename(fn(i))
215 if name in oldentries and same(oldentries[name], i):
216 # move existing directory entry over
217 self._entries[name] = oldentries[name]
220 # create new directory entry
223 self._entries[name] = self.inodes.add_entry(ent)
225 # delete any other directory entries that were not in found in 'items'
227 llfuse.invalidate_entry(self.inode, str(i))
228 self.inodes.del_entry(oldentries[i])
232 '''Delete all entries'''
233 oldentries = self._entries
236 if isinstance(n, Directory):
238 llfuse.invalidate_entry(self.inode, str(n))
239 self.inodes.del_entry(oldentries[n])
243 class CollectionDirectory(Directory):
244 '''Represents the root of a directory tree holding a collection.'''
246 def __init__(self, parent_inode, inodes, api, collection_locator):
247 super(CollectionDirectory, self).__init__(parent_inode)
250 self.collection_locator = collection_locator
251 self.manifest_text_file = None
253 self.collection_object = None
256 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
260 if re.match(r'^[a-f0-9]{32}', self.collection_locator):
262 #with llfuse.lock_released:
263 new_collection_object = self.api.collections().get(uuid=self.collection_locator).execute()
264 if "portable_data_hash" not in new_collection_object:
265 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
267 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
268 self.collection_object = new_collection_object
270 if self.manifest_text_file is not None:
271 self.manifest_text_file.contents = self.collection_object["manifest_text"]
272 self.manifest_text_file._ctime = self.ctime()
273 self.manifest_text_file._mtime = self.mtime()
274 if self.pdh_file is not None:
275 self.pdh_file.contents = self.collection_object["portable_data_hash"]
276 self.pdh_file._ctime = self.ctime()
277 self.pdh_file._mtime = self.mtime()
280 collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api)
281 for s in collection.all_streams():
283 for part in s.name().split('/'):
284 if part != '' and part != '.':
285 partname = sanitize_filename(part)
286 if partname not in cwd._entries:
287 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
288 cwd = cwd._entries[partname]
289 for k, v in s.files().items():
290 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.ctime(), self.mtime()))
292 except Exception as detail:
293 _logger.error("arv-mount %s: error", self.collection_locator)
294 _logger.exception(detail)
297 def __getitem__(self, item):
299 if item == '.manifest_text':
300 if self.manifest_text_file is None:
301 self.manifest_text_file = StringFile(self.inode, self.collection_object["manifest_text"], self.ctime(), self.mtime())
302 self.inodes.add_entry(self.manifest_text_file)
303 return self.manifest_text_file
304 elif item == '.portable_data_hash':
305 if self.pdh_file is None:
306 self.pdh_file = StringFile(self.inode, self.collection_object["portable_data_hash"], self.ctime(), self.mtime())
307 self.inodes.add_entry(self.pdh_file)
310 return super(CollectionDirectory, self).__getitem__(item)
312 def __contains__(self, k):
313 if k in ('.manifest_text', '.portable_data_hash'):
316 return super(CollectionDirectory, self).__contains__(k)
320 return convertTime(self.collection_object["created_at"])
324 return convertTime(self.collection_object["modified_at"])
326 class MagicDirectory(Directory):
327 '''A special directory that logically contains the set of all extant keep
328 locators. When a file is referenced by lookup(), it is tested to see if it
329 is a valid keep locator to a manifest, and if so, loads the manifest
330 contents as a subdirectory of this directory with the locator as the
331 directory name. Since querying a list of all extant keep locators is
332 impractical, only collections that have already been accessed are visible
336 def __init__(self, parent_inode, inodes, api):
337 super(MagicDirectory, self).__init__(parent_inode)
341 def __contains__(self, k):
342 if k in self._entries:
345 e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, self.api, k))
351 except Exception as e:
352 _logger.debug('arv-mount exception keep %s', e)
355 def __getitem__(self, item):
357 return self._entries[item]
359 raise KeyError("No collection with id " + item)
361 class RecursiveInvalidateDirectory(Directory):
362 def invalidate(self):
363 if self.inode == llfuse.ROOT_INODE:
364 llfuse.lock.acquire()
366 super(RecursiveInvalidateDirectory, self).invalidate()
367 for a in self._entries:
368 self._entries[a].invalidate()
369 except Exception as e:
372 if self.inode == llfuse.ROOT_INODE:
373 llfuse.lock.release()
375 class TagsDirectory(RecursiveInvalidateDirectory):
376 '''A special directory that contains as subdirectories all tags visible to the user.'''
378 def __init__(self, parent_inode, inodes, api, poll_time=60):
379 super(TagsDirectory, self).__init__(parent_inode)
383 # arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
386 self._poll_time = poll_time
389 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
391 self.merge(tags['items'],
392 lambda i: i['name'] if 'name' in i else i['uuid'],
393 lambda a, i: a.tag == i,
394 lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
396 class TagDirectory(Directory):
397 '''A special directory that contains as subdirectories all collections visible
398 to the user that are tagged with a particular tag.
401 def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
402 super(TagDirectory, self).__init__(parent_inode)
407 self._poll_time = poll_time
410 taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
411 ['name', '=', self.tag],
412 ['head_uuid', 'is_a', 'arvados#collection']],
413 select=['head_uuid']).execute()
414 self.merge(taggedcollections['items'],
415 lambda i: i['head_uuid'],
416 lambda a, i: a.collection_locator == i['head_uuid'],
417 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid']))
420 class ProjectDirectory(RecursiveInvalidateDirectory):
421 '''A special directory that contains the contents of a project.'''
423 def __init__(self, parent_inode, inodes, api, project_object, poll=False, poll_time=60):
424 super(ProjectDirectory, self).__init__(parent_inode)
427 self.project_object = project_object
428 self.uuid = project_object['uuid']
430 def createDirectory(self, i):
431 if re.match(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}', i['uuid']):
432 return CollectionDirectory(self.inode, self.inodes, self.api, i['uuid'])
433 elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
434 return ProjectDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
435 elif re.match(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}', i['uuid']) and i['head_kind'] == 'arvados#collection':
436 return CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid'])
437 #elif re.match(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}', i['uuid']):
439 elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
440 return ObjectFile(self.parent_inode, i)
447 if i['name'] is None:
449 elif re.match(r'[a-z0-9]{5}-(4zz18|j7d0g)-[a-z0-9]{15}', i['uuid']):
450 # collection or subproject
452 elif re.match(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}', i['uuid']) and i['head_kind'] == 'arvados#collection':
455 elif 'kind' in i and i['kind'].startswith('arvados#'):
457 return "{}.{}".format(i['name'], i['kind'][8:])
462 if isinstance(a, CollectionDirectory):
463 return a.collection_locator == i['uuid']
464 elif isinstance(a, ProjectDirectory):
465 return a.uuid == i['uuid']
466 elif isinstance(a, ObjectFile):
467 return a.uuid == i['uuid'] and not a.stale()
470 #with llfuse.lock_released:
471 if re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', self.uuid):
472 self.project_object = self.api.groups().get(uuid=self.uuid).execute()
473 elif re.match(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}', self.uuid):
474 self.project_object = self.api.users().get(uuid=self.uuid).execute()
476 contents = arvados.util.list_all(self.api.groups().contents, uuid=self.uuid)
477 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
478 contents += arvados.util.list_all(self.api.links().list, filters=[['tail_uuid', '=', self.uuid], ['link_class', '=', 'name']])
485 self.createDirectory)
488 return convertTime(self.project_object["created_at"]) if "created_at" in self.project_object else 0
491 return convertTime(self.project_object["modified_at"]) if "modified_at" in self.project_object else 0
495 class SharedDirectory(RecursiveInvalidateDirectory):
496 '''A special directory that represents users or groups who have shared projects with me.'''
498 def __init__(self, parent_inode, inodes, api, exclude, poll=False, poll_time=60):
499 super(SharedDirectory, self).__init__(parent_inode)
500 self.current_user = api.users().current().execute()
505 # arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
508 self._poll_time = poll_time
511 #with llfuse.lock_released:
512 all_projects = arvados.util.list_all(self.api.groups().list, filters=[['group_class','=','project']])
514 for ob in all_projects:
515 objects[ob['uuid']] = ob
519 for ob in all_projects:
520 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
522 root_owners[ob['owner_uuid']] = True
524 #with llfuse.lock_released:
525 lusers = arvados.util.list_all(self.api.users().list, filters=[['uuid','in', list(root_owners)]])
526 lgroups = arvados.util.list_all(self.api.groups().list, filters=[['uuid','in', list(root_owners)]])
532 objects[l["uuid"]] = l
534 objects[l["uuid"]] = l
537 for r in root_owners:
541 contents[obr["name"]] = obr
542 if "first_name" in obr:
543 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
546 if r['owner_uuid'] not in objects:
547 contents[r['name']] = r
550 self.merge(contents.items(),
552 lambda a, i: a.uuid == i[1]['uuid'],
553 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, i[1], poll=self._poll, poll_time=self._poll_time))
554 except Exception as e:
558 class FileHandle(object):
559 '''Connects a numeric file handle to a File or Directory object that has
560 been opened by the client.'''
562 def __init__(self, fh, entry):
567 class Inodes(object):
568 '''Manage the set of inodes. This is the mapping from a numeric id
569 to a concrete File or Directory object'''
573 self._counter = llfuse.ROOT_INODE
575 def __getitem__(self, item):
576 return self._entries[item]
578 def __setitem__(self, key, item):
579 self._entries[key] = item
582 return self._entries.iterkeys()
585 return self._entries.items()
587 def __contains__(self, k):
588 return k in self._entries
590 def add_entry(self, entry):
591 entry.inode = self._counter
592 self._entries[entry.inode] = entry
596 def del_entry(self, entry):
597 llfuse.invalidate_inode(entry.inode)
598 del self._entries[entry.inode]
600 class Operations(llfuse.Operations):
601 '''This is the main interface with llfuse. The methods on this object are
602 called by llfuse threads to service FUSE events to query and read from
605 llfuse has its own global lock which is acquired before calling a request handler,
606 so request handlers do not run concurrently unless the lock is explicitly released
607 with llfuse.lock_released.'''
609 def __init__(self, uid, gid):
610 super(Operations, self).__init__()
612 self.inodes = Inodes()
616 # dict of inode to filehandle
617 self._filehandles = {}
618 self._filehandles_counter = 1
620 # Other threads that need to wait until the fuse driver
621 # is fully initialized should wait() on this event object.
622 self.initlock = threading.Event()
625 # Allow threads that are waiting for the driver to be finished
626 # initializing to continue
629 def access(self, inode, mode, ctx):
632 def getattr(self, inode):
633 if inode not in self.inodes:
634 raise llfuse.FUSEError(errno.ENOENT)
636 e = self.inodes[inode]
638 entry = llfuse.EntryAttributes()
641 entry.entry_timeout = 300
642 entry.attr_timeout = 300
644 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
645 if isinstance(e, Directory):
646 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
648 entry.st_mode |= stat.S_IFREG
651 entry.st_uid = self.uid
652 entry.st_gid = self.gid
655 entry.st_size = e.size()
657 entry.st_blksize = 512
658 entry.st_blocks = (e.size()/512)
659 if e.size()/512 != 0:
662 entry.st_mtime = e.mtime()
663 entry.st_ctime = e.ctime()
667 def lookup(self, parent_inode, name):
668 _logger.debug("arv-mount lookup: parent_inode %i name %s",
675 if parent_inode in self.inodes:
676 p = self.inodes[parent_inode]
678 inode = p.parent_inode
680 inode = p[name].inode
683 return self.getattr(inode)
685 raise llfuse.FUSEError(errno.ENOENT)
687 def open(self, inode, flags):
688 if inode in self.inodes:
689 p = self.inodes[inode]
691 raise llfuse.FUSEError(errno.ENOENT)
693 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
694 raise llfuse.FUSEError(errno.EROFS)
696 if isinstance(p, Directory):
697 raise llfuse.FUSEError(errno.EISDIR)
699 fh = self._filehandles_counter
700 self._filehandles_counter += 1
701 self._filehandles[fh] = FileHandle(fh, p)
704 def read(self, fh, off, size):
705 _logger.debug("arv-mount read %i %i %i", fh, off, size)
706 if fh in self._filehandles:
707 handle = self._filehandles[fh]
709 raise llfuse.FUSEError(errno.EBADF)
712 with llfuse.lock_released:
713 return handle.entry.readfrom(off, size)
715 raise llfuse.FUSEError(errno.EIO)
717 def release(self, fh):
718 if fh in self._filehandles:
719 del self._filehandles[fh]
721 def opendir(self, inode):
722 _logger.debug("arv-mount opendir: inode %i", inode)
724 if inode in self.inodes:
725 p = self.inodes[inode]
727 raise llfuse.FUSEError(errno.ENOENT)
729 if not isinstance(p, Directory):
730 raise llfuse.FUSEError(errno.ENOTDIR)
732 fh = self._filehandles_counter
733 self._filehandles_counter += 1
734 if p.parent_inode in self.inodes:
735 parent = self.inodes[p.parent_inode]
737 raise llfuse.FUSEError(errno.EIO)
739 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
742 def readdir(self, fh, off):
743 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
745 if fh in self._filehandles:
746 handle = self._filehandles[fh]
748 raise llfuse.FUSEError(errno.EBADF)
750 _logger.debug("arv-mount handle.entry %s", handle.entry)
753 while e < len(handle.entry):
754 if handle.entry[e][1].inode in self.inodes:
755 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
758 def releasedir(self, fh):
759 del self._filehandles[fh]
762 st = llfuse.StatvfsData()
763 st.f_bsize = 64 * 1024
776 # The llfuse documentation recommends only overloading functions that
777 # are actually implemented, as the default implementation will raise ENOSYS.
778 # However, there is a bug in the llfuse default implementation of create()
779 # "create() takes exactly 5 positional arguments (6 given)" which will crash
781 # The workaround is to implement it with the proper number of parameters,
782 # and then everything works out.
783 def create(self, p1, p2, p3, p4, p5):
784 raise llfuse.FUSEError(errno.EROFS)