2 # FUSE driver for Arvados Keep
5 from __future__ import print_function
22 from llfuse import FUSEError
24 class FreshBase(object):
25 '''Base class for maintaining fresh/stale state to determine when to update.'''
29 self._last_update = time()
32 # Mark the value as stale
36 # Test if the entries dict is stale
41 return (self._last_update + self._poll_time) < time()
46 self._last_update = time()
49 class File(FreshBase):
50 '''Base for file objects.'''
52 def __init__(self, parent_inode):
53 super(File, self).__init__()
55 self.parent_inode = parent_inode
60 def readfrom(self, off, size):
64 class StreamReaderFile(File):
65 '''Wraps a StreamFileReader as a file.'''
67 def __init__(self, parent_inode, reader):
68 super(StreamReaderFile, self).__init__(parent_inode)
72 return self.reader.size()
74 def readfrom(self, off, size):
75 return self.reader.readfrom(off, size)
81 class ObjectFile(File):
82 '''Wraps a dict as a serialized json object.'''
84 def __init__(self, parent_inode, contents):
85 super(ObjectFile, self).__init__(parent_inode)
86 self.contentsdict = contents
87 self.uuid = self.contentsdict['uuid']
88 self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
91 return len(self.contents)
93 def readfrom(self, off, size):
94 return self.contents[off:(off+size)]
97 class Directory(FreshBase):
98 '''Generic directory object, backed by a dict.
99 Consists of a set of entries with the key representing the filename
100 and the value referencing a File or Directory object.
103 def __init__(self, parent_inode):
104 super(Directory, self).__init__()
106 '''parent_inode is the integer inode number'''
108 if not isinstance(parent_inode, int):
109 raise Exception("parent_inode should be an int")
110 self.parent_inode = parent_inode
113 # Overriden by subclasses to implement logic to update the entries dict
114 # when the directory is stale
118 # Only used when computing the size of the disk footprint of the directory
123 def checkupdate(self):
127 except apiclient.errors.HttpError as e:
130 def __getitem__(self, item):
132 return self._entries[item]
136 return self._entries.items()
140 return self._entries.iterkeys()
142 def __contains__(self, k):
144 return k in self._entries
146 def merge(self, items, fn, same, new_entry):
147 '''Helper method for updating the contents of the directory.
149 items: array with new directory contents
151 fn: function to take an entry in 'items' and return the desired file or
154 same: function to compare an existing entry with an entry in the items
155 list to determine whether to keep the existing entry.
157 new_entry: function to create a new directory entry from array entry.
160 oldentries = self._entries
164 if n in oldentries and same(oldentries[n], i):
165 self._entries[n] = oldentries[n]
168 self._entries[n] = self.inodes.add_entry(new_entry(i))
170 llfuse.invalidate_entry(self.inode, str(n))
171 self.inodes.del_entry(oldentries[n])
175 class CollectionDirectory(Directory):
176 '''Represents the root of a directory tree holding a collection.'''
178 def __init__(self, parent_inode, inodes, collection_locator):
179 super(CollectionDirectory, self).__init__(parent_inode)
181 self.collection_locator = collection_locator
184 return i['uuid'] == self.collection_locator
188 collection = arvados.CollectionReader(self.collection_locator)
189 for s in collection.all_streams():
191 for part in s.name().split('/'):
192 if part != '' and part != '.':
193 if part not in cwd._entries:
194 cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
195 cwd = cwd._entries[part]
196 for k, v in s.files().items():
197 cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
200 except Exception as detail:
201 logging.debug("arv-mount %s: error: %s" % (self.collection_locator,detail))
204 class MagicDirectory(Directory):
205 '''A special directory that logically contains the set of all extant keep
206 locators. When a file is referenced by lookup(), it is tested to see if it
207 is a valid keep locator to a manifest, and if so, loads the manifest
208 contents as a subdirectory of this directory with the locator as the
209 directory name. Since querying a list of all extant keep locators is
210 impractical, only collections that have already been accessed are visible
214 def __init__(self, parent_inode, inodes):
215 super(MagicDirectory, self).__init__(parent_inode)
218 def __contains__(self, k):
219 if k in self._entries:
222 e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, k))
228 except Exception as e:
229 logging.debug('arv-mount exception keep %s', e)
232 def __getitem__(self, item):
234 return self._entries[item]
236 raise KeyError("No collection with id " + item)
238 class TagsDirectory(Directory):
239 '''A special directory that contains as subdirectories all tags visible to the user.'''
241 def __init__(self, parent_inode, inodes, api, poll_time=60):
242 super(TagsDirectory, self).__init__(parent_inode)
246 arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
249 self._poll_time = poll_time
251 def invalidate(self):
253 super(TagsDirectory, self).invalidate()
254 for a in self._entries:
255 self._entries[a].invalidate()
258 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
260 self.merge(tags['items'],
262 lambda a, i: a.tag == i,
263 lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
265 class TagDirectory(Directory):
266 '''A special directory that contains as subdirectories all collections visible
267 to the user that are tagged with a particular tag.
270 def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
271 super(TagDirectory, self).__init__(parent_inode)
276 self._poll_time = poll_time
279 taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
280 ['name', '=', self.tag],
281 ['head_uuid', 'is_a', 'arvados#collection']],
282 select=['head_uuid']).execute()
283 self.merge(taggedcollections['items'],
284 lambda i: i['head_uuid'],
285 lambda a, i: a.collection_locator == i['head_uuid'],
286 lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
289 class GroupsDirectory(Directory):
290 '''A special directory that contains as subdirectories all groups visible to the user.'''
292 def __init__(self, parent_inode, inodes, api, poll_time=60):
293 super(GroupsDirectory, self).__init__(parent_inode)
297 arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
300 self._poll_time = poll_time
302 def invalidate(self):
304 super(GroupsDirectory, self).invalidate()
305 for a in self._entries:
306 self._entries[a].invalidate()
309 groups = self.api.groups().list().execute()
310 self.merge(groups['items'],
312 lambda a, i: a.uuid == i['uuid'],
313 lambda i: GroupDirectory(self.inode, self.inodes, self.api, i, poll=self._poll, poll_time=self._poll_time))
316 class GroupDirectory(Directory):
317 '''A special directory that contains the contents of a group.'''
319 def __init__(self, parent_inode, inodes, api, uuid, poll=False, poll_time=60):
320 super(GroupDirectory, self).__init__(parent_inode)
323 self.uuid = uuid['uuid']
325 self._poll_time = poll_time
327 def invalidate(self):
329 super(GroupDirectory, self).invalidate()
330 for a in self._entries:
331 self._entries[a].invalidate()
333 def createDirectory(self, i):
334 if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
335 return CollectionDirectory(self.inode, self.inodes, i['uuid'])
336 elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
337 return GroupDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
338 elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
339 return ObjectFile(self.parent_inode, i)
343 contents = self.api.groups().contents(uuid=self.uuid, include_linked=True).execute()
345 for a in contents['links']:
346 links[a['head_uuid']] = a['name']
349 if i['uuid'] in links:
350 return links[i['uuid']]
355 if isinstance(a, CollectionDirectory):
356 return a.collection_locator == i['uuid']
357 elif isinstance(a, GroupDirectory):
358 return a.uuid == i['uuid']
359 elif isinstance(a, ObjectFile):
360 return a.uuid == i['uuid'] and not a.stale()
363 self.merge(contents['items'],
366 self.createDirectory)
369 class FileHandle(object):
370 '''Connects a numeric file handle to a File or Directory object that has
371 been opened by the client.'''
373 def __init__(self, fh, entry):
378 class Inodes(object):
379 '''Manage the set of inodes. This is the mapping from a numeric id
380 to a concrete File or Directory object'''
384 self._counter = llfuse.ROOT_INODE
386 def __getitem__(self, item):
387 return self._entries[item]
389 def __setitem__(self, key, item):
390 self._entries[key] = item
393 return self._entries.iterkeys()
396 return self._entries.items()
398 def __contains__(self, k):
399 return k in self._entries
401 def add_entry(self, entry):
402 entry.inode = self._counter
403 self._entries[entry.inode] = entry
407 def del_entry(self, entry):
408 llfuse.invalidate_inode(entry.inode)
409 del self._entries[entry.inode]
411 class Operations(llfuse.Operations):
412 '''This is the main interface with llfuse. The methods on this object are
413 called by llfuse threads to service FUSE events to query and read from
416 llfuse has its own global lock which is acquired before calling a request handler,
417 so request handlers do not run concurrently unless the lock is explicitly released
418 with llfuse.lock_released.'''
420 def __init__(self, uid, gid, debug=False):
421 super(Operations, self).__init__()
424 logging.basicConfig(level=logging.DEBUG)
425 logging.info("arv-mount debug enabled")
427 self.inodes = Inodes()
431 # dict of inode to filehandle
432 self._filehandles = {}
433 self._filehandles_counter = 1
435 # Other threads that need to wait until the fuse driver
436 # is fully initialized should wait() on this event object.
437 self.initlock = threading.Event()
440 # Allow threads that are waiting for the driver to be finished
441 # initializing to continue
444 def access(self, inode, mode, ctx):
447 def getattr(self, inode):
448 if inode not in self.inodes:
449 raise llfuse.FUSEError(errno.ENOENT)
451 e = self.inodes[inode]
453 entry = llfuse.EntryAttributes()
456 entry.entry_timeout = 300
457 entry.attr_timeout = 300
459 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
460 if isinstance(e, Directory):
461 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
463 entry.st_mode |= stat.S_IFREG
466 entry.st_uid = self.uid
467 entry.st_gid = self.gid
470 entry.st_size = e.size()
472 entry.st_blksize = 1024
473 entry.st_blocks = e.size()/1024
474 if e.size()/1024 != 0:
482 def lookup(self, parent_inode, name):
483 logging.debug("arv-mount lookup: parent_inode %i name %s", parent_inode, name)
489 if parent_inode in self.inodes:
490 p = self.inodes[parent_inode]
492 inode = p.parent_inode
494 inode = p[name].inode
497 return self.getattr(inode)
499 raise llfuse.FUSEError(errno.ENOENT)
501 def open(self, inode, flags):
502 if inode in self.inodes:
503 p = self.inodes[inode]
505 raise llfuse.FUSEError(errno.ENOENT)
507 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
508 raise llfuse.FUSEError(errno.EROFS)
510 if isinstance(p, Directory):
511 raise llfuse.FUSEError(errno.EISDIR)
513 fh = self._filehandles_counter
514 self._filehandles_counter += 1
515 self._filehandles[fh] = FileHandle(fh, p)
518 def read(self, fh, off, size):
519 logging.debug("arv-mount read %i %i %i", fh, off, size)
520 if fh in self._filehandles:
521 handle = self._filehandles[fh]
523 raise llfuse.FUSEError(errno.EBADF)
526 with llfuse.lock_released:
527 return handle.entry.readfrom(off, size)
529 raise llfuse.FUSEError(errno.EIO)
531 def release(self, fh):
532 if fh in self._filehandles:
533 del self._filehandles[fh]
535 def opendir(self, inode):
536 logging.debug("arv-mount opendir: inode %i", inode)
538 if inode in self.inodes:
539 p = self.inodes[inode]
541 raise llfuse.FUSEError(errno.ENOENT)
543 if not isinstance(p, Directory):
544 raise llfuse.FUSEError(errno.ENOTDIR)
546 fh = self._filehandles_counter
547 self._filehandles_counter += 1
548 if p.parent_inode in self.inodes:
549 parent = self.inodes[p.parent_inode]
551 raise llfuse.FUSEError(errno.EIO)
553 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
556 def readdir(self, fh, off):
557 logging.debug("arv-mount readdir: fh %i off %i", fh, off)
559 if fh in self._filehandles:
560 handle = self._filehandles[fh]
562 raise llfuse.FUSEError(errno.EBADF)
564 logging.debug("arv-mount handle.entry %s", handle.entry)
567 while e < len(handle.entry):
568 if handle.entry[e][1].inode in self.inodes:
569 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
572 def releasedir(self, fh):
573 del self._filehandles[fh]
576 st = llfuse.StatvfsData()
577 st.f_bsize = 1024 * 1024
590 # The llfuse documentation recommends only overloading functions that
591 # are actually implemented, as the default implementation will raise ENOSYS.
592 # However, there is a bug in the llfuse default implementation of create()
593 # "create() takes exactly 5 positional arguments (6 given)" which will crash
595 # The workaround is to implement it with the proper number of parameters,
596 # and then everything works out.
597 def create(self, p1, p2, p3, p4, p5):
598 raise llfuse.FUSEError(errno.EROFS)