2 # FUSE driver for Arvados Keep
8 from llfuse import FUSEError
22 _logger = logging.getLogger('arvados.arvados_fuse')
25 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
27 def sanitize_filename(dirty):
28 # http://www.dwheeler.com/essays/fixing-unix-linux-filenames.html
34 if (c >= '\x00' and c <= '\x1f') or c == '\x7f' or c == '/':
35 # skip control characters and /
39 # strip leading - or ~ and leading/trailing whitespace
40 stripped = fn.lstrip("-~ ").rstrip()
47 class FreshBase(object):
48 '''Base class for maintaining fresh/stale state to determine when to update.'''
52 self._last_update = time.time()
55 # Mark the value as stale
59 # Test if the entries dict is stale
64 return (self._last_update + self._poll_time) < time.time()
69 self._last_update = time.time()
78 class File(FreshBase):
79 '''Base for file objects.'''
81 def __init__(self, parent_inode, _ctime=0, _mtime=0):
82 super(File, self).__init__()
84 self.parent_inode = parent_inode
91 def readfrom(self, off, size):
101 class StreamReaderFile(File):
102 '''Wraps a StreamFileReader as a file.'''
104 def __init__(self, parent_inode, reader, _ctime, _mtime):
105 super(StreamReaderFile, self).__init__(parent_inode, _ctime, _mtime)
109 return self.reader.size()
111 def readfrom(self, off, size):
112 return self.reader.readfrom(off, size)
118 class StringFile(File):
119 '''Wrap a simple string as a file'''
120 def __init__(self, parent_inode, contents, _ctime, _mtime):
121 super(StringFile, self).__init__(parent_inode, _ctime, _mtime)
122 self.contents = contents
125 return len(self.contents)
127 def readfrom(self, off, size):
128 return self.contents[off:(off+size)]
130 class ObjectFile(StringFile):
131 '''Wrap a dict as a serialized json object.'''
133 def __init__(self, parent_inode, contents):
134 _ctime = convertTime(contents['created_at']) if 'created_at' in contents else 0
135 _mtime = convertTime(contents['modified_at']) if 'modified_at' in contents else 0
136 super(ObjectFile, self).__init__(parent_inode, json.dumps(contents, indent=4, sort_keys=True)+"\n", _ctime, _mtime)
137 self.contentsdict = contents
138 self.uuid = self.contentsdict['uuid']
141 class Directory(FreshBase):
142 '''Generic directory object, backed by a dict.
143 Consists of a set of entries with the key representing the filename
144 and the value referencing a File or Directory object.
147 def __init__(self, parent_inode):
148 super(Directory, self).__init__()
150 '''parent_inode is the integer inode number'''
152 if not isinstance(parent_inode, int):
153 raise Exception("parent_inode should be an int")
154 self.parent_inode = parent_inode
157 # Overriden by subclasses to implement logic to update the entries dict
158 # when the directory is stale
162 # Only used when computing the size of the disk footprint of the directory
167 def checkupdate(self):
171 except apiclient.errors.HttpError as e:
174 def __getitem__(self, item):
176 return self._entries[item]
180 return self._entries.items()
184 return self._entries.iterkeys()
186 def __contains__(self, k):
188 return k in self._entries
190 def merge(self, items, fn, same, new_entry):
191 '''Helper method for updating the contents of the directory. Takes a list
192 describing the new contents of the directory, reuse entries that are
193 the same in both the old and new lists, create new entries, and delete
194 old entries missing from the new list.
196 items: iterable with new directory contents
198 fn: function to take an entry in 'items' and return the desired file or
199 directory name, or None if this entry should be skipped
201 same: function to compare an existing entry (a File or Directory
202 object) with an entry in the items list to determine whether to keep
205 new_entry: function to create a new directory entry (File or Directory
206 object) from an entry in the items list.
210 oldentries = self._entries
213 name = sanitize_filename(fn(i))
215 if name in oldentries and same(oldentries[name], i):
216 # move existing directory entry over
217 self._entries[name] = oldentries[name]
220 # create new directory entry
223 self._entries[name] = self.inodes.add_entry(ent)
225 # delete any other directory entries that were not in found in 'items'
227 llfuse.invalidate_entry(self.inode, str(i))
228 self.inodes.del_entry(oldentries[i])
232 '''Delete all entries'''
233 oldentries = self._entries
236 if isinstance(n, Directory):
238 llfuse.invalidate_entry(self.inode, str(n))
239 self.inodes.del_entry(oldentries[n])
243 class CollectionDirectory(Directory):
244 '''Represents the root of a directory tree holding a collection.'''
246 def __init__(self, parent_inode, inodes, api, collection_locator):
247 super(CollectionDirectory, self).__init__(parent_inode)
250 self.collection_locator = collection_locator
251 self.manifest_text_file = None
253 self.collection_object = None
256 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
260 #with llfuse.lock_released:
261 new_collection_object = self.api.collections().get(uuid=self.collection_locator).execute()
262 if "portable_data_hash" not in new_collection_object:
263 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
265 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
266 self.collection_object = new_collection_object
268 if self.manifest_text_file is not None:
269 self.manifest_text_file.contents = self.collection_object["manifest_text"]
270 self.manifest_text_file._ctime = self.ctime()
271 self.manifest_text_file._mtime = self.mtime()
272 if self.pdh_file is not None:
273 self.pdh_file.contents = self.collection_object["portable_data_hash"]
274 self.pdh_file._ctime = self.ctime()
275 self.pdh_file._mtime = self.mtime()
278 collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api)
279 for s in collection.all_streams():
281 for part in s.name().split('/'):
282 if part != '' and part != '.':
283 partname = sanitize_filename(part)
284 if partname not in cwd._entries:
285 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
286 cwd = cwd._entries[partname]
287 for k, v in s.files().items():
288 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.ctime(), self.mtime()))
291 except Exception as detail:
292 _logger.error("arv-mount %s: error", self.collection_locator)
293 _logger.exception(detail)
296 def __getitem__(self, item):
298 if item == '.manifest_text':
299 if self.manifest_text_file is None:
300 self.manifest_text_file = StringFile(self.inode, self.collection_object["manifest_text"], self.ctime(), self.mtime())
301 self.inodes.add_entry(self.manifest_text_file)
302 return self.manifest_text_file
303 elif item == '.portable_data_hash':
304 if self.pdh_file is None:
305 self.pdh_file = StringFile(self.inode, self.collection_object["portable_data_hash"], self.ctime(), self.mtime())
306 self.inodes.add_entry(self.pdh_file)
309 return super(CollectionDirectory, self).__getitem__(item)
311 def __contains__(self, k):
312 if k in ('.manifest_text', '.portable_data_hash'):
315 return super(CollectionDirectory, self).__contains__(k)
319 return convertTime(self.collection_object["created_at"])
323 return convertTime(self.collection_object["modified_at"])
325 class MagicDirectory(Directory):
326 '''A special directory that logically contains the set of all extant keep
327 locators. When a file is referenced by lookup(), it is tested to see if it
328 is a valid keep locator to a manifest, and if so, loads the manifest
329 contents as a subdirectory of this directory with the locator as the
330 directory name. Since querying a list of all extant keep locators is
331 impractical, only collections that have already been accessed are visible
335 def __init__(self, parent_inode, inodes, api):
336 super(MagicDirectory, self).__init__(parent_inode)
340 def __contains__(self, k):
341 if k in self._entries:
344 e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, self.api, k))
350 except Exception as e:
351 _logger.debug('arv-mount exception keep %s', e)
354 def __getitem__(self, item):
356 return self._entries[item]
358 raise KeyError("No collection with id " + item)
360 class RecursiveInvalidateDirectory(Directory):
361 def invalidate(self):
362 if self.inode == llfuse.ROOT_INODE:
363 llfuse.lock.acquire()
365 super(RecursiveInvalidateDirectory, self).invalidate()
366 for a in self._entries:
367 self._entries[a].invalidate()
368 except Exception as e:
371 if self.inode == llfuse.ROOT_INODE:
372 llfuse.lock.release()
374 class TagsDirectory(RecursiveInvalidateDirectory):
375 '''A special directory that contains as subdirectories all tags visible to the user.'''
377 def __init__(self, parent_inode, inodes, api, poll_time=60):
378 super(TagsDirectory, self).__init__(parent_inode)
382 arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
385 self._poll_time = poll_time
388 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
390 self.merge(tags['items'],
391 lambda i: i['name'] if 'name' in i else i['uuid'],
392 lambda a, i: a.tag == i,
393 lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
395 class TagDirectory(Directory):
396 '''A special directory that contains as subdirectories all collections visible
397 to the user that are tagged with a particular tag.
400 def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
401 super(TagDirectory, self).__init__(parent_inode)
406 self._poll_time = poll_time
409 taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
410 ['name', '=', self.tag],
411 ['head_uuid', 'is_a', 'arvados#collection']],
412 select=['head_uuid']).execute()
413 self.merge(taggedcollections['items'],
414 lambda i: i['head_uuid'],
415 lambda a, i: a.collection_locator == i['head_uuid'],
416 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid']))
419 class ProjectDirectory(RecursiveInvalidateDirectory):
420 '''A special directory that contains the contents of a project.'''
422 def __init__(self, parent_inode, inodes, api, project_object, poll=False, poll_time=60):
423 super(ProjectDirectory, self).__init__(parent_inode)
426 self.project_object = project_object
427 self.uuid = project_object['uuid']
429 def createDirectory(self, i):
430 if re.match(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}', i['uuid']):
431 return CollectionDirectory(self.inode, self.inodes, self.api, i['uuid'])
432 elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
433 return ProjectDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
434 elif re.match(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}', i['uuid']) and i['head_kind'] == 'arvados#collection':
435 return CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid'])
436 #elif re.match(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}', i['uuid']):
438 elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
439 return ObjectFile(self.parent_inode, i)
446 if i['name'] is None:
448 elif re.match(r'[a-z0-9]{5}-(4zz18|j7d0g)-[a-z0-9]{15}', i['uuid']):
449 # collection or subproject
451 elif re.match(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}', i['uuid']) and i['head_kind'] == 'arvados#collection':
454 elif 'kind' in i and i['kind'].startswith('arvados#'):
456 return "{}.{}".format(i['name'], i['kind'][8:])
461 if isinstance(a, CollectionDirectory):
462 return a.collection_locator == i['uuid']
463 elif isinstance(a, ProjectDirectory):
464 return a.uuid == i['uuid']
465 elif isinstance(a, ObjectFile):
466 return a.uuid == i['uuid'] and not a.stale()
469 #with llfuse.lock_released:
470 if re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', self.uuid):
471 self.project_object = self.api.groups().get(uuid=self.uuid).execute()
472 elif re.match(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}', self.uuid):
473 self.project_object = self.api.users().get(uuid=self.uuid).execute()
475 contents = arvados.util.list_all(self.api.groups().contents, uuid=self.uuid)
476 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
477 contents += arvados.util.list_all(self.api.links().list, filters=[['tail_uuid', '=', self.uuid], ['link_class', '=', 'name']])
484 self.createDirectory)
487 return convertTime(self.project_object["created_at"]) if "created_at" in self.project_object else 0
490 return convertTime(self.project_object["modified_at"]) if "modified_at" in self.project_object else 0
494 class HomeDirectory(RecursiveInvalidateDirectory):
495 '''A special directory that represents users or groups who have shared projects with me.'''
497 def __init__(self, parent_inode, inodes, api, poll=False, poll_time=60):
498 super(HomeDirectory, self).__init__(parent_inode)
499 self.current_user = api.users().current().execute()
504 # arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
507 self._poll_time = poll_time
510 #with llfuse.lock_released:
511 all_projects = arvados.util.list_all(self.api.groups().list, filters=[['group_class','=','project']])
513 for ob in all_projects:
514 objects[ob['uuid']] = ob
518 for ob in all_projects:
519 if ob['owner_uuid'] == self.current_user['uuid'] or ob['owner_uuid'] not in objects:
521 root_owners[ob['owner_uuid']] = True
523 #with llfuse.lock_released:
524 lusers = arvados.util.list_all(self.api.users().list, filters=[['uuid','in', list(root_owners)]])
525 lgroups = arvados.util.list_all(self.api.groups().list, filters=[['uuid','in', list(root_owners)]])
531 objects[l["uuid"]] = l
533 objects[l["uuid"]] = l
536 for r in root_owners:
540 contents[obr["name"]] = obr
541 if "first_name" in obr:
542 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
545 if r['owner_uuid'] not in objects:
546 contents[r['name']] = r
549 self.merge(contents.items(),
551 lambda a, i: a.uuid == i[1]['uuid'],
552 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, i[1], poll=self._poll, poll_time=self._poll_time))
553 except Exception as e:
557 class FileHandle(object):
558 '''Connects a numeric file handle to a File or Directory object that has
559 been opened by the client.'''
561 def __init__(self, fh, entry):
566 class Inodes(object):
567 '''Manage the set of inodes. This is the mapping from a numeric id
568 to a concrete File or Directory object'''
572 self._counter = llfuse.ROOT_INODE
574 def __getitem__(self, item):
575 return self._entries[item]
577 def __setitem__(self, key, item):
578 self._entries[key] = item
581 return self._entries.iterkeys()
584 return self._entries.items()
586 def __contains__(self, k):
587 return k in self._entries
589 def add_entry(self, entry):
590 entry.inode = self._counter
591 self._entries[entry.inode] = entry
595 def del_entry(self, entry):
596 llfuse.invalidate_inode(entry.inode)
597 del self._entries[entry.inode]
599 class Operations(llfuse.Operations):
600 '''This is the main interface with llfuse. The methods on this object are
601 called by llfuse threads to service FUSE events to query and read from
604 llfuse has its own global lock which is acquired before calling a request handler,
605 so request handlers do not run concurrently unless the lock is explicitly released
606 with llfuse.lock_released.'''
608 def __init__(self, uid, gid):
609 super(Operations, self).__init__()
611 self.inodes = Inodes()
615 # dict of inode to filehandle
616 self._filehandles = {}
617 self._filehandles_counter = 1
619 # Other threads that need to wait until the fuse driver
620 # is fully initialized should wait() on this event object.
621 self.initlock = threading.Event()
624 # Allow threads that are waiting for the driver to be finished
625 # initializing to continue
628 def access(self, inode, mode, ctx):
631 def getattr(self, inode):
632 if inode not in self.inodes:
633 raise llfuse.FUSEError(errno.ENOENT)
635 e = self.inodes[inode]
637 entry = llfuse.EntryAttributes()
640 entry.entry_timeout = 300
641 entry.attr_timeout = 300
643 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
644 if isinstance(e, Directory):
645 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
647 entry.st_mode |= stat.S_IFREG
650 entry.st_uid = self.uid
651 entry.st_gid = self.gid
654 entry.st_size = e.size()
656 entry.st_blksize = 512
657 entry.st_blocks = (e.size()/512)
658 if e.size()/512 != 0:
661 entry.st_mtime = e.mtime()
662 entry.st_ctime = e.ctime()
666 def lookup(self, parent_inode, name):
667 _logger.debug("arv-mount lookup: parent_inode %i name %s",
674 if parent_inode in self.inodes:
675 p = self.inodes[parent_inode]
677 inode = p.parent_inode
679 inode = p[name].inode
682 return self.getattr(inode)
684 raise llfuse.FUSEError(errno.ENOENT)
686 def open(self, inode, flags):
687 if inode in self.inodes:
688 p = self.inodes[inode]
690 raise llfuse.FUSEError(errno.ENOENT)
692 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
693 raise llfuse.FUSEError(errno.EROFS)
695 if isinstance(p, Directory):
696 raise llfuse.FUSEError(errno.EISDIR)
698 fh = self._filehandles_counter
699 self._filehandles_counter += 1
700 self._filehandles[fh] = FileHandle(fh, p)
703 def read(self, fh, off, size):
704 _logger.debug("arv-mount read %i %i %i", fh, off, size)
705 if fh in self._filehandles:
706 handle = self._filehandles[fh]
708 raise llfuse.FUSEError(errno.EBADF)
711 with llfuse.lock_released:
712 return handle.entry.readfrom(off, size)
714 raise llfuse.FUSEError(errno.EIO)
716 def release(self, fh):
717 if fh in self._filehandles:
718 del self._filehandles[fh]
720 def opendir(self, inode):
721 _logger.debug("arv-mount opendir: inode %i", inode)
723 if inode in self.inodes:
724 p = self.inodes[inode]
726 raise llfuse.FUSEError(errno.ENOENT)
728 if not isinstance(p, Directory):
729 raise llfuse.FUSEError(errno.ENOTDIR)
731 fh = self._filehandles_counter
732 self._filehandles_counter += 1
733 if p.parent_inode in self.inodes:
734 parent = self.inodes[p.parent_inode]
736 raise llfuse.FUSEError(errno.EIO)
738 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
741 def readdir(self, fh, off):
742 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
744 if fh in self._filehandles:
745 handle = self._filehandles[fh]
747 raise llfuse.FUSEError(errno.EBADF)
749 _logger.debug("arv-mount handle.entry %s", handle.entry)
752 while e < len(handle.entry):
753 if handle.entry[e][1].inode in self.inodes:
754 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
757 def releasedir(self, fh):
758 del self._filehandles[fh]
761 st = llfuse.StatvfsData()
762 st.f_bsize = 64 * 1024
775 # The llfuse documentation recommends only overloading functions that
776 # are actually implemented, as the default implementation will raise ENOSYS.
777 # However, there is a bug in the llfuse default implementation of create()
778 # "create() takes exactly 5 positional arguments (6 given)" which will crash
780 # The workaround is to implement it with the proper number of parameters,
781 # and then everything works out.
782 def create(self, p1, p2, p3, p4, p5):
783 raise llfuse.FUSEError(errno.EROFS)