2 # FUSE driver for Arvados Keep
21 from llfuse import FUSEError
23 class FreshBase(object):
24 '''Base class for maintaining fresh/stale state to determine when to update.'''
28 self._last_update = time()
31 # Mark the value as stale
35 # Test if the entries dict is stale
40 return (self._last_update + self._poll_time) < time()
45 self._last_update = time()
48 class File(FreshBase):
49 '''Base for file objects.'''
51 def __init__(self, parent_inode):
52 super(File, self).__init__()
54 self.parent_inode = parent_inode
59 def readfrom(self, off, size):
63 class StreamReaderFile(File):
64 '''Wraps a StreamFileReader as a file.'''
66 def __init__(self, parent_inode, reader):
67 super(StreamReaderFile, self).__init__(parent_inode)
71 return self.reader.size()
73 def readfrom(self, off, size):
74 return self.reader.readfrom(off, size)
80 class ObjectFile(File):
81 '''Wraps a dict as a serialized json object.'''
83 def __init__(self, parent_inode, contents):
84 super(ObjectFile, self).__init__(parent_inode)
85 self.contentsdict = contents
86 self.uuid = self.contentsdict['uuid']
87 self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
90 return len(self.contents)
92 def readfrom(self, off, size):
93 return self.contents[off:(off+size)]
96 class Directory(FreshBase):
97 '''Generic directory object, backed by a dict.
98 Consists of a set of entries with the key representing the filename
99 and the value referencing a File or Directory object.
102 def __init__(self, parent_inode):
103 super(Directory, self).__init__()
105 '''parent_inode is the integer inode number'''
107 if not isinstance(parent_inode, int):
108 raise Exception("parent_inode should be an int")
109 self.parent_inode = parent_inode
112 # Overriden by subclasses to implement logic to update the entries dict
113 # when the directory is stale
117 # Only used when computing the size of the disk footprint of the directory
122 def checkupdate(self):
126 except apiclient.errors.HttpError as e:
129 def __getitem__(self, item):
131 return self._entries[item]
135 return self._entries.items()
139 return self._entries.iterkeys()
141 def __contains__(self, k):
143 return k in self._entries
145 def merge(self, items, fn, same, new_entry):
146 '''Helper method for updating the contents of the directory.
148 items: array with new directory contents
150 fn: function to take an entry in 'items' and return the desired file or
153 same: function to compare an existing entry with an entry in the items
154 list to determine whether to keep the existing entry.
156 new_entry: function to create a new directory entry from array entry.
159 oldentries = self._entries
163 if n in oldentries and same(oldentries[n], i):
164 self._entries[n] = oldentries[n]
167 self._entries[n] = self.inodes.add_entry(new_entry(i))
169 llfuse.invalidate_entry(self.inode, str(n))
170 self.inodes.del_entry(oldentries[n])
174 class CollectionDirectory(Directory):
175 '''Represents the root of a directory tree holding a collection.'''
177 def __init__(self, parent_inode, inodes, collection_locator):
178 super(CollectionDirectory, self).__init__(parent_inode)
180 self.collection_locator = collection_locator
183 return i['uuid'] == self.collection_locator
187 collection = arvados.CollectionReader(self.collection_locator)
188 for s in collection.all_streams():
190 for part in s.name().split('/'):
191 if part != '' and part != '.':
192 if part not in cwd._entries:
193 cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
194 cwd = cwd._entries[part]
195 for k, v in s.files().items():
196 cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
199 except Exception as detail:
200 logging.debug("arv-mount %s: error: %s" % (self.collection_locator,detail))
203 class MagicDirectory(Directory):
204 '''A special directory that logically contains the set of all extant keep
205 locators. When a file is referenced by lookup(), it is tested to see if it
206 is a valid keep locator to a manifest, and if so, loads the manifest
207 contents as a subdirectory of this directory with the locator as the
208 directory name. Since querying a list of all extant keep locators is
209 impractical, only collections that have already been accessed are visible
213 def __init__(self, parent_inode, inodes):
214 super(MagicDirectory, self).__init__(parent_inode)
217 def __contains__(self, k):
218 if k in self._entries:
221 e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, k))
227 except Exception as e:
228 logging.debug('arv-mount exception keep %s', e)
231 def __getitem__(self, item):
233 return self._entries[item]
235 raise KeyError("No collection with id " + item)
237 class TagsDirectory(Directory):
238 '''A special directory that contains as subdirectories all tags visible to the user.'''
240 def __init__(self, parent_inode, inodes, api, poll_time=60):
241 super(TagsDirectory, self).__init__(parent_inode)
245 arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
248 self._poll_time = poll_time
250 def invalidate(self):
252 super(TagsDirectory, self).invalidate()
253 for a in self._entries:
254 self._entries[a].invalidate()
257 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
259 self.merge(tags['items'],
261 lambda a, i: a.tag == i,
262 lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
264 class TagDirectory(Directory):
265 '''A special directory that contains as subdirectories all collections visible
266 to the user that are tagged with a particular tag.
269 def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
270 super(TagDirectory, self).__init__(parent_inode)
275 self._poll_time = poll_time
278 taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
279 ['name', '=', self.tag],
280 ['head_uuid', 'is_a', 'arvados#collection']],
281 select=['head_uuid']).execute()
282 self.merge(taggedcollections['items'],
283 lambda i: i['head_uuid'],
284 lambda a, i: a.collection_locator == i['head_uuid'],
285 lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
288 class GroupsDirectory(Directory):
289 '''A special directory that contains as subdirectories all groups visible to the user.'''
291 def __init__(self, parent_inode, inodes, api, poll_time=60):
292 super(GroupsDirectory, self).__init__(parent_inode)
296 arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
299 self._poll_time = poll_time
301 def invalidate(self):
303 super(GroupsDirectory, self).invalidate()
304 for a in self._entries:
305 self._entries[a].invalidate()
308 groups = self.api.groups().list().execute()
309 self.merge(groups['items'],
311 lambda a, i: a.uuid == i['uuid'],
312 lambda i: GroupDirectory(self.inode, self.inodes, self.api, i, poll=self._poll, poll_time=self._poll_time))
315 class GroupDirectory(Directory):
316 '''A special directory that contains the contents of a group.'''
318 def __init__(self, parent_inode, inodes, api, uuid, poll=False, poll_time=60):
319 super(GroupDirectory, self).__init__(parent_inode)
322 self.uuid = uuid['uuid']
324 self._poll_time = poll_time
326 def invalidate(self):
328 super(GroupDirectory, self).invalidate()
329 for a in self._entries:
330 self._entries[a].invalidate()
332 def createDirectory(self, i):
333 if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
334 return CollectionDirectory(self.inode, self.inodes, i['uuid'])
335 elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
336 return GroupDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
337 elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
338 return ObjectFile(self.parent_inode, i)
342 contents = self.api.groups().contents(uuid=self.uuid, include_linked=True).execute()
344 for a in contents['links']:
345 links[a['head_uuid']] = a['name']
348 if i['uuid'] in links:
349 return links[i['uuid']]
354 if isinstance(a, CollectionDirectory):
355 return a.collection_locator == i['uuid']
356 elif isinstance(a, GroupDirectory):
357 return a.uuid == i['uuid']
358 elif isinstance(a, ObjectFile):
359 return a.uuid == i['uuid'] and not a.stale()
362 self.merge(contents['items'],
365 self.createDirectory)
368 class FileHandle(object):
369 '''Connects a numeric file handle to a File or Directory object that has
370 been opened by the client.'''
372 def __init__(self, fh, entry):
377 class Inodes(object):
378 '''Manage the set of inodes. This is the mapping from a numeric id
379 to a concrete File or Directory object'''
383 self._counter = llfuse.ROOT_INODE
385 def __getitem__(self, item):
386 return self._entries[item]
388 def __setitem__(self, key, item):
389 self._entries[key] = item
392 return self._entries.iterkeys()
395 return self._entries.items()
397 def __contains__(self, k):
398 return k in self._entries
400 def add_entry(self, entry):
401 entry.inode = self._counter
402 self._entries[entry.inode] = entry
406 def del_entry(self, entry):
407 llfuse.invalidate_inode(entry.inode)
408 del self._entries[entry.inode]
410 class Operations(llfuse.Operations):
411 '''This is the main interface with llfuse. The methods on this object are
412 called by llfuse threads to service FUSE events to query and read from
415 llfuse has its own global lock which is acquired before calling a request handler,
416 so request handlers do not run concurrently unless the lock is explicitly released
417 with llfuse.lock_released.'''
419 def __init__(self, uid, gid, debug=False):
420 super(Operations, self).__init__()
423 logging.basicConfig(level=logging.DEBUG)
424 logging.info("arv-mount debug enabled")
426 self.inodes = Inodes()
430 # dict of inode to filehandle
431 self._filehandles = {}
432 self._filehandles_counter = 1
434 # Other threads that need to wait until the fuse driver
435 # is fully initialized should wait() on this event object.
436 self.initlock = threading.Event()
439 # Allow threads that are waiting for the driver to be finished
440 # initializing to continue
443 def access(self, inode, mode, ctx):
446 def getattr(self, inode):
447 if inode not in self.inodes:
448 raise llfuse.FUSEError(errno.ENOENT)
450 e = self.inodes[inode]
452 entry = llfuse.EntryAttributes()
455 entry.entry_timeout = 300
456 entry.attr_timeout = 300
458 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
459 if isinstance(e, Directory):
460 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
462 entry.st_mode |= stat.S_IFREG
465 entry.st_uid = self.uid
466 entry.st_gid = self.gid
469 entry.st_size = e.size()
471 entry.st_blksize = 1024
472 entry.st_blocks = e.size()/1024
473 if e.size()/1024 != 0:
481 def lookup(self, parent_inode, name):
482 logging.debug("arv-mount lookup: parent_inode %i name %s", parent_inode, name)
488 if parent_inode in self.inodes:
489 p = self.inodes[parent_inode]
491 inode = p.parent_inode
493 inode = p[name].inode
496 return self.getattr(inode)
498 raise llfuse.FUSEError(errno.ENOENT)
500 def open(self, inode, flags):
501 if inode in self.inodes:
502 p = self.inodes[inode]
504 raise llfuse.FUSEError(errno.ENOENT)
506 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
507 raise llfuse.FUSEError(errno.EROFS)
509 if isinstance(p, Directory):
510 raise llfuse.FUSEError(errno.EISDIR)
512 fh = self._filehandles_counter
513 self._filehandles_counter += 1
514 self._filehandles[fh] = FileHandle(fh, p)
517 def read(self, fh, off, size):
518 logging.debug("arv-mount read %i %i %i", fh, off, size)
519 if fh in self._filehandles:
520 handle = self._filehandles[fh]
522 raise llfuse.FUSEError(errno.EBADF)
525 with llfuse.lock_released:
526 return handle.entry.readfrom(off, size)
528 raise llfuse.FUSEError(errno.EIO)
530 def release(self, fh):
531 if fh in self._filehandles:
532 del self._filehandles[fh]
534 def opendir(self, inode):
535 logging.debug("arv-mount opendir: inode %i", inode)
537 if inode in self.inodes:
538 p = self.inodes[inode]
540 raise llfuse.FUSEError(errno.ENOENT)
542 if not isinstance(p, Directory):
543 raise llfuse.FUSEError(errno.ENOTDIR)
545 fh = self._filehandles_counter
546 self._filehandles_counter += 1
547 if p.parent_inode in self.inodes:
548 parent = self.inodes[p.parent_inode]
550 raise llfuse.FUSEError(errno.EIO)
552 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
555 def readdir(self, fh, off):
556 logging.debug("arv-mount readdir: fh %i off %i", fh, off)
558 if fh in self._filehandles:
559 handle = self._filehandles[fh]
561 raise llfuse.FUSEError(errno.EBADF)
563 logging.debug("arv-mount handle.entry %s", handle.entry)
566 while e < len(handle.entry):
567 if handle.entry[e][1].inode in self.inodes:
568 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
571 def releasedir(self, fh):
572 del self._filehandles[fh]
575 st = llfuse.StatvfsData()
576 st.f_bsize = 1024 * 1024
589 # The llfuse documentation recommends only overloading functions that
590 # are actually implemented, as the default implementation will raise ENOSYS.
591 # However, there is a bug in the llfuse default implementation of create()
592 # "create() takes exactly 5 positional arguments (6 given)" which will crash
594 # The workaround is to implement it with the proper number of parameters,
595 # and then everything works out.
596 def create(self, p1, p2, p3, p4, p5):
597 raise llfuse.FUSEError(errno.EROFS)