2 # FUSE driver for Arvados Keep
20 _logger = logging.getLogger('arvados.arvados_fuse')
23 from llfuse import FUSEError
25 class FreshBase(object):
26 '''Base class for maintaining fresh/stale state to determine when to update.'''
30 self._last_update = time()
33 # Mark the value as stale
37 # Test if the entries dict is stale
42 return (self._last_update + self._poll_time) < time()
47 self._last_update = time()
50 class File(FreshBase):
51 '''Base for file objects.'''
53 def __init__(self, parent_inode):
54 super(File, self).__init__()
56 self.parent_inode = parent_inode
61 def readfrom(self, off, size):
65 class StreamReaderFile(File):
66 '''Wraps a StreamFileReader as a file.'''
68 def __init__(self, parent_inode, reader):
69 super(StreamReaderFile, self).__init__(parent_inode)
73 return self.reader.size()
75 def readfrom(self, off, size):
76 return self.reader.readfrom(off, size)
82 class ObjectFile(File):
83 '''Wraps a dict as a serialized json object.'''
85 def __init__(self, parent_inode, contents):
86 super(ObjectFile, self).__init__(parent_inode)
87 self.contentsdict = contents
88 self.uuid = self.contentsdict['uuid']
89 self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
92 return len(self.contents)
94 def readfrom(self, off, size):
95 return self.contents[off:(off+size)]
98 class Directory(FreshBase):
99 '''Generic directory object, backed by a dict.
100 Consists of a set of entries with the key representing the filename
101 and the value referencing a File or Directory object.
104 def __init__(self, parent_inode):
105 super(Directory, self).__init__()
107 '''parent_inode is the integer inode number'''
109 if not isinstance(parent_inode, int):
110 raise Exception("parent_inode should be an int")
111 self.parent_inode = parent_inode
114 # Overriden by subclasses to implement logic to update the entries dict
115 # when the directory is stale
119 # Only used when computing the size of the disk footprint of the directory
124 def checkupdate(self):
128 except apiclient.errors.HttpError as e:
131 def __getitem__(self, item):
133 return self._entries[item]
137 return self._entries.items()
141 return self._entries.iterkeys()
143 def __contains__(self, k):
145 return k in self._entries
147 def merge(self, items, fn, same, new_entry):
148 '''Helper method for updating the contents of the directory.
150 items: array with new directory contents
152 fn: function to take an entry in 'items' and return the desired file or
155 same: function to compare an existing entry with an entry in the items
156 list to determine whether to keep the existing entry.
158 new_entry: function to create a new directory entry from array entry.
161 oldentries = self._entries
165 if n in oldentries and same(oldentries[n], i):
166 self._entries[n] = oldentries[n]
169 self._entries[n] = self.inodes.add_entry(new_entry(i))
171 llfuse.invalidate_entry(self.inode, str(n))
172 self.inodes.del_entry(oldentries[n])
176 class CollectionDirectory(Directory):
177 '''Represents the root of a directory tree holding a collection.'''
179 def __init__(self, parent_inode, inodes, collection_locator):
180 super(CollectionDirectory, self).__init__(parent_inode)
182 self.collection_locator = collection_locator
185 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
189 collection = arvados.CollectionReader(self.collection_locator)
190 for s in collection.all_streams():
192 for part in s.name().split('/'):
193 if part != '' and part != '.':
194 if part not in cwd._entries:
195 cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
196 cwd = cwd._entries[part]
197 for k, v in s.files().items():
198 cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
201 except Exception as detail:
202 _logger.debug("arv-mount %s: error: %s",
203 self.collection_locator, detail)
206 class MagicDirectory(Directory):
207 '''A special directory that logically contains the set of all extant keep
208 locators. When a file is referenced by lookup(), it is tested to see if it
209 is a valid keep locator to a manifest, and if so, loads the manifest
210 contents as a subdirectory of this directory with the locator as the
211 directory name. Since querying a list of all extant keep locators is
212 impractical, only collections that have already been accessed are visible
216 def __init__(self, parent_inode, inodes):
217 super(MagicDirectory, self).__init__(parent_inode)
220 def __contains__(self, k):
221 if k in self._entries:
224 e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, k))
230 except Exception as e:
231 _logger.debug('arv-mount exception keep %s', e)
234 def __getitem__(self, item):
236 return self._entries[item]
238 raise KeyError("No collection with id " + item)
240 class TagsDirectory(Directory):
241 '''A special directory that contains as subdirectories all tags visible to the user.'''
243 def __init__(self, parent_inode, inodes, api, poll_time=60):
244 super(TagsDirectory, self).__init__(parent_inode)
248 arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
251 self._poll_time = poll_time
253 def invalidate(self):
255 super(TagsDirectory, self).invalidate()
256 for a in self._entries:
257 self._entries[a].invalidate()
260 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
262 self.merge(tags['items'],
264 lambda a, i: a.tag == i,
265 lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
267 class TagDirectory(Directory):
268 '''A special directory that contains as subdirectories all collections visible
269 to the user that are tagged with a particular tag.
272 def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
273 super(TagDirectory, self).__init__(parent_inode)
278 self._poll_time = poll_time
281 taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
282 ['name', '=', self.tag],
283 ['head_uuid', 'is_a', 'arvados#collection']],
284 select=['head_uuid']).execute()
285 self.merge(taggedcollections['items'],
286 lambda i: i['head_uuid'],
287 lambda a, i: a.collection_locator == i['head_uuid'],
288 lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
291 class GroupsDirectory(Directory):
292 '''A special directory that contains as subdirectories all groups visible to the user.'''
294 def __init__(self, parent_inode, inodes, api, poll_time=60):
295 super(GroupsDirectory, self).__init__(parent_inode)
299 arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
302 self._poll_time = poll_time
304 def invalidate(self):
306 super(GroupsDirectory, self).invalidate()
307 for a in self._entries:
308 self._entries[a].invalidate()
311 groups = self.api.groups().list(
312 filters=[['group_class','=','project']]).execute()
313 self.merge(groups['items'],
315 lambda a, i: a.uuid == i['uuid'],
316 lambda i: GroupDirectory(self.inode, self.inodes, self.api, i, poll=self._poll, poll_time=self._poll_time))
319 class GroupDirectory(Directory):
320 '''A special directory that contains the contents of a group.'''
322 def __init__(self, parent_inode, inodes, api, uuid, poll=False, poll_time=60):
323 super(GroupDirectory, self).__init__(parent_inode)
326 self.uuid = uuid['uuid']
328 self._poll_time = poll_time
330 def invalidate(self):
332 super(GroupDirectory, self).invalidate()
333 for a in self._entries:
334 self._entries[a].invalidate()
336 def createDirectory(self, i):
337 if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
338 return CollectionDirectory(self.inode, self.inodes, i['uuid'])
339 elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
340 return GroupDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
341 elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
342 return ObjectFile(self.parent_inode, i)
346 contents = self.api.groups().contents(uuid=self.uuid, include_linked=True).execute()
348 for a in contents['links']:
349 links[a['head_uuid']] = a['name']
352 if i['uuid'] in links:
353 return links[i['uuid']]
358 if isinstance(a, CollectionDirectory):
359 return a.collection_locator == i['uuid']
360 elif isinstance(a, GroupDirectory):
361 return a.uuid == i['uuid']
362 elif isinstance(a, ObjectFile):
363 return a.uuid == i['uuid'] and not a.stale()
366 self.merge(contents['items'],
369 self.createDirectory)
372 class FileHandle(object):
373 '''Connects a numeric file handle to a File or Directory object that has
374 been opened by the client.'''
376 def __init__(self, fh, entry):
381 class Inodes(object):
382 '''Manage the set of inodes. This is the mapping from a numeric id
383 to a concrete File or Directory object'''
387 self._counter = llfuse.ROOT_INODE
389 def __getitem__(self, item):
390 return self._entries[item]
392 def __setitem__(self, key, item):
393 self._entries[key] = item
396 return self._entries.iterkeys()
399 return self._entries.items()
401 def __contains__(self, k):
402 return k in self._entries
404 def add_entry(self, entry):
405 entry.inode = self._counter
406 self._entries[entry.inode] = entry
410 def del_entry(self, entry):
411 llfuse.invalidate_inode(entry.inode)
412 del self._entries[entry.inode]
414 class Operations(llfuse.Operations):
415 '''This is the main interface with llfuse. The methods on this object are
416 called by llfuse threads to service FUSE events to query and read from
419 llfuse has its own global lock which is acquired before calling a request handler,
420 so request handlers do not run concurrently unless the lock is explicitly released
421 with llfuse.lock_released.'''
423 def __init__(self, uid, gid):
424 super(Operations, self).__init__()
426 self.inodes = Inodes()
430 # dict of inode to filehandle
431 self._filehandles = {}
432 self._filehandles_counter = 1
434 # Other threads that need to wait until the fuse driver
435 # is fully initialized should wait() on this event object.
436 self.initlock = threading.Event()
439 # Allow threads that are waiting for the driver to be finished
440 # initializing to continue
443 def access(self, inode, mode, ctx):
446 def getattr(self, inode):
447 if inode not in self.inodes:
448 raise llfuse.FUSEError(errno.ENOENT)
450 e = self.inodes[inode]
452 entry = llfuse.EntryAttributes()
455 entry.entry_timeout = 300
456 entry.attr_timeout = 300
458 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
459 if isinstance(e, Directory):
460 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
462 entry.st_mode |= stat.S_IFREG
465 entry.st_uid = self.uid
466 entry.st_gid = self.gid
469 entry.st_size = e.size()
471 entry.st_blksize = 1024
472 entry.st_blocks = e.size()/1024
473 if e.size()/1024 != 0:
481 def lookup(self, parent_inode, name):
482 _logger.debug("arv-mount lookup: parent_inode %i name %s",
489 if parent_inode in self.inodes:
490 p = self.inodes[parent_inode]
492 inode = p.parent_inode
494 inode = p[name].inode
497 return self.getattr(inode)
499 raise llfuse.FUSEError(errno.ENOENT)
501 def open(self, inode, flags):
502 if inode in self.inodes:
503 p = self.inodes[inode]
505 raise llfuse.FUSEError(errno.ENOENT)
507 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
508 raise llfuse.FUSEError(errno.EROFS)
510 if isinstance(p, Directory):
511 raise llfuse.FUSEError(errno.EISDIR)
513 fh = self._filehandles_counter
514 self._filehandles_counter += 1
515 self._filehandles[fh] = FileHandle(fh, p)
518 def read(self, fh, off, size):
519 _logger.debug("arv-mount read %i %i %i", fh, off, size)
520 if fh in self._filehandles:
521 handle = self._filehandles[fh]
523 raise llfuse.FUSEError(errno.EBADF)
526 with llfuse.lock_released:
527 return handle.entry.readfrom(off, size)
529 raise llfuse.FUSEError(errno.EIO)
531 def release(self, fh):
532 if fh in self._filehandles:
533 del self._filehandles[fh]
535 def opendir(self, inode):
536 _logger.debug("arv-mount opendir: inode %i", inode)
538 if inode in self.inodes:
539 p = self.inodes[inode]
541 raise llfuse.FUSEError(errno.ENOENT)
543 if not isinstance(p, Directory):
544 raise llfuse.FUSEError(errno.ENOTDIR)
546 fh = self._filehandles_counter
547 self._filehandles_counter += 1
548 if p.parent_inode in self.inodes:
549 parent = self.inodes[p.parent_inode]
551 raise llfuse.FUSEError(errno.EIO)
553 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
556 def readdir(self, fh, off):
557 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
559 if fh in self._filehandles:
560 handle = self._filehandles[fh]
562 raise llfuse.FUSEError(errno.EBADF)
564 _logger.debug("arv-mount handle.entry %s", handle.entry)
567 while e < len(handle.entry):
568 if handle.entry[e][1].inode in self.inodes:
569 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
572 def releasedir(self, fh):
573 del self._filehandles[fh]
576 st = llfuse.StatvfsData()
577 st.f_bsize = 1024 * 1024
590 # The llfuse documentation recommends only overloading functions that
591 # are actually implemented, as the default implementation will raise ENOSYS.
592 # However, there is a bug in the llfuse default implementation of create()
593 # "create() takes exactly 5 positional arguments (6 given)" which will crash
595 # The workaround is to implement it with the proper number of parameters,
596 # and then everything works out.
597 def create(self, p1, p2, p3, p4, p5):
598 raise llfuse.FUSEError(errno.EROFS)