2 # FUSE driver for Arvados Keep
21 from llfuse import FUSEError
23 class FreshBase(object):
24 '''Base class for maintaining fresh/stale state to determine when to update.'''
28 self._last_update = time()
31 # Mark the value as stale
35 # Test if the entries dict is stale
40 return (self._last_update + self._poll_time) < time()
45 self._last_update = time()
48 class File(FreshBase):
49 '''Base for file objects.'''
51 def __init__(self, parent_inode):
52 super(File, self).__init__()
54 self.parent_inode = parent_inode
59 def readfrom(self, off, size):
63 class StreamReaderFile(File):
64 '''Wraps a StreamFileReader as a file.'''
66 def __init__(self, parent_inode, reader):
67 super(StreamReaderFile, self).__init__(parent_inode)
71 return self.reader.size()
73 def readfrom(self, off, size):
74 return self.reader.readfrom(off, size)
80 class ObjectFile(File):
81 '''Wraps a dict as a serialized json object.'''
83 def __init__(self, parent_inode, contents):
84 super(ObjectFile, self).__init__(parent_inode)
85 self.contentsdict = contents
86 self.uuid = self.contentsdict['uuid']
87 self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
90 return len(self.contents)
92 def readfrom(self, off, size):
93 return self.contents[off:(off+size)]
96 class Directory(FreshBase):
97 '''Generic directory object, backed by a dict.
98 Consists of a set of entries with the key representing the filename
99 and the value referencing a File or Directory object.
102 def __init__(self, parent_inode):
103 super(Directory, self).__init__()
105 '''parent_inode is the integer inode number'''
107 if not isinstance(parent_inode, int):
108 raise Exception("parent_inode should be an int")
109 self.parent_inode = parent_inode
112 # Overriden by subclasses to implement logic to update the entries dict
113 # when the directory is stale
117 # Only used when computing the size of the disk footprint of the directory
122 def checkupdate(self):
126 except apiclient.errors.HttpError as e:
129 def __getitem__(self, item):
131 return self._entries[item]
135 return self._entries.items()
139 return self._entries.iterkeys()
141 def __contains__(self, k):
143 return k in self._entries
145 def merge(self, items, fn, same, new_entry):
146 '''Helper method for updating the contents of the directory.
148 items: array with new directory contents
150 fn: function to take an entry in 'items' and return the desired file or
153 same: function to compare an existing entry with an entry in the items
154 list to determine whether to keep the existing entry.
156 new_entry: function to create a new directory entry from array entry.
159 oldentries = self._entries
163 if n in oldentries and same(oldentries[n], i):
164 self._entries[n] = oldentries[n]
167 self._entries[n] = self.inodes.add_entry(new_entry(i))
169 llfuse.invalidate_entry(self.inode, str(n))
170 self.inodes.del_entry(oldentries[n])
174 class CollectionDirectory(Directory):
175 '''Represents the root of a directory tree holding a collection.'''
177 def __init__(self, parent_inode, inodes, collection_locator):
178 super(CollectionDirectory, self).__init__(parent_inode)
180 self.collection_locator = collection_locator
183 return i['uuid'] == self.collection_locator
187 collection = arvados.CollectionReader(self.collection_locator)
188 for s in collection.all_streams():
190 for part in s.name().split('/'):
191 if part != '' and part != '.':
192 if part not in cwd._entries:
193 cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
194 cwd = cwd._entries[part]
195 for k, v in s.files().items():
196 cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
199 except Exception as detail:
200 logging.debug("arv-mount %s: error: %s" % (self.collection_locator,detail))
203 class MagicDirectory(Directory):
204 '''A special directory that logically contains the set of all extant keep
205 locators. When a file is referenced by lookup(), it is tested to see if it
206 is a valid keep locator to a manifest, and if so, loads the manifest
207 contents as a subdirectory of this directory with the locator as the
208 directory name. Since querying a list of all extant keep locators is
209 impractical, only collections that have already been accessed are visible
213 def __init__(self, parent_inode, inodes):
214 super(MagicDirectory, self).__init__(parent_inode)
217 def __contains__(self, k):
218 if k in self._entries:
221 e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, k))
227 except Exception as e:
228 logging.debug('arv-mount exception keep %s', e)
231 def __getitem__(self, item):
233 return self._entries[item]
235 raise KeyError("No collection with id " + item)
237 class TagsDirectory(Directory):
238 '''A special directory that contains as subdirectories all tags visible to the user.'''
240 def __init__(self, parent_inode, inodes, api, poll_time=60):
241 super(TagsDirectory, self).__init__(parent_inode)
245 arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
248 self._poll_time = poll_time
250 def invalidate(self):
252 super(TagsDirectory, self).invalidate()
253 for a in self._entries:
254 self._entries[a].invalidate()
257 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
259 self.merge(tags['items'],
261 lambda a, i: a.tag == i,
262 lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
264 class TagDirectory(Directory):
265 '''A special directory that contains as subdirectories all collections visible
266 to the user that are tagged with a particular tag.
269 def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
270 super(TagDirectory, self).__init__(parent_inode)
275 self._poll_time = poll_time
278 taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
279 ['name', '=', self.tag],
280 ['head_uuid', 'is_a', 'arvados#collection']],
281 select=['head_uuid']).execute()
282 self.merge(taggedcollections['items'],
283 lambda i: i['head_uuid'],
284 lambda a, i: a.collection_locator == i['head_uuid'],
285 lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
288 class GroupsDirectory(Directory):
289 '''A special directory that contains as subdirectories all groups visible to the user.'''
291 def __init__(self, parent_inode, inodes, api, poll_time=60):
292 super(GroupsDirectory, self).__init__(parent_inode)
296 arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
299 self._poll_time = poll_time
301 def invalidate(self):
303 super(GroupsDirectory, self).invalidate()
304 for a in self._entries:
305 self._entries[a].invalidate()
308 groups = self.api.groups().list(
309 filters=[['group_class','=','project']]).execute()
310 self.merge(groups['items'],
312 lambda a, i: a.uuid == i['uuid'],
313 lambda i: GroupDirectory(self.inode, self.inodes, self.api, i, poll=self._poll, poll_time=self._poll_time))
316 class GroupDirectory(Directory):
317 '''A special directory that contains the contents of a group.'''
319 def __init__(self, parent_inode, inodes, api, uuid, poll=False, poll_time=60):
320 super(GroupDirectory, self).__init__(parent_inode)
323 self.uuid = uuid['uuid']
325 self._poll_time = poll_time
327 def invalidate(self):
329 super(GroupDirectory, self).invalidate()
330 for a in self._entries:
331 self._entries[a].invalidate()
333 def createDirectory(self, i):
334 if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
335 return CollectionDirectory(self.inode, self.inodes, i['uuid'])
336 elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
337 return GroupDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
338 elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
339 return ObjectFile(self.parent_inode, i)
343 contents = self.api.groups().contents(uuid=self.uuid, include_linked=True).execute()
345 for a in contents['links']:
346 links[a['head_uuid']] = a['name']
349 if i['uuid'] in links:
350 return links[i['uuid']]
355 if isinstance(a, CollectionDirectory):
356 return a.collection_locator == i['uuid']
357 elif isinstance(a, GroupDirectory):
358 return a.uuid == i['uuid']
359 elif isinstance(a, ObjectFile):
360 return a.uuid == i['uuid'] and not a.stale()
363 self.merge(contents['items'],
366 self.createDirectory)
369 class FileHandle(object):
370 '''Connects a numeric file handle to a File or Directory object that has
371 been opened by the client.'''
373 def __init__(self, fh, entry):
378 class Inodes(object):
379 '''Manage the set of inodes. This is the mapping from a numeric id
380 to a concrete File or Directory object'''
384 self._counter = llfuse.ROOT_INODE
386 def __getitem__(self, item):
387 return self._entries[item]
389 def __setitem__(self, key, item):
390 self._entries[key] = item
393 return self._entries.iterkeys()
396 return self._entries.items()
398 def __contains__(self, k):
399 return k in self._entries
401 def add_entry(self, entry):
402 entry.inode = self._counter
403 self._entries[entry.inode] = entry
407 def del_entry(self, entry):
408 llfuse.invalidate_inode(entry.inode)
409 del self._entries[entry.inode]
411 class Operations(llfuse.Operations):
412 '''This is the main interface with llfuse. The methods on this object are
413 called by llfuse threads to service FUSE events to query and read from
416 llfuse has its own global lock which is acquired before calling a request handler,
417 so request handlers do not run concurrently unless the lock is explicitly released
418 with llfuse.lock_released.'''
420 def __init__(self, uid, gid):
421 super(Operations, self).__init__()
423 self.inodes = Inodes()
427 # dict of inode to filehandle
428 self._filehandles = {}
429 self._filehandles_counter = 1
431 # Other threads that need to wait until the fuse driver
432 # is fully initialized should wait() on this event object.
433 self.initlock = threading.Event()
436 # Allow threads that are waiting for the driver to be finished
437 # initializing to continue
440 def access(self, inode, mode, ctx):
443 def getattr(self, inode):
444 if inode not in self.inodes:
445 raise llfuse.FUSEError(errno.ENOENT)
447 e = self.inodes[inode]
449 entry = llfuse.EntryAttributes()
452 entry.entry_timeout = 300
453 entry.attr_timeout = 300
455 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
456 if isinstance(e, Directory):
457 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
459 entry.st_mode |= stat.S_IFREG
462 entry.st_uid = self.uid
463 entry.st_gid = self.gid
466 entry.st_size = e.size()
468 entry.st_blksize = 1024
469 entry.st_blocks = e.size()/1024
470 if e.size()/1024 != 0:
478 def lookup(self, parent_inode, name):
479 logging.debug("arv-mount lookup: parent_inode %i name %s", parent_inode, name)
485 if parent_inode in self.inodes:
486 p = self.inodes[parent_inode]
488 inode = p.parent_inode
490 inode = p[name].inode
493 return self.getattr(inode)
495 raise llfuse.FUSEError(errno.ENOENT)
497 def open(self, inode, flags):
498 if inode in self.inodes:
499 p = self.inodes[inode]
501 raise llfuse.FUSEError(errno.ENOENT)
503 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
504 raise llfuse.FUSEError(errno.EROFS)
506 if isinstance(p, Directory):
507 raise llfuse.FUSEError(errno.EISDIR)
509 fh = self._filehandles_counter
510 self._filehandles_counter += 1
511 self._filehandles[fh] = FileHandle(fh, p)
514 def read(self, fh, off, size):
515 logging.debug("arv-mount read %i %i %i", fh, off, size)
516 if fh in self._filehandles:
517 handle = self._filehandles[fh]
519 raise llfuse.FUSEError(errno.EBADF)
522 with llfuse.lock_released:
523 return handle.entry.readfrom(off, size)
525 raise llfuse.FUSEError(errno.EIO)
527 def release(self, fh):
528 if fh in self._filehandles:
529 del self._filehandles[fh]
531 def opendir(self, inode):
532 logging.debug("arv-mount opendir: inode %i", inode)
534 if inode in self.inodes:
535 p = self.inodes[inode]
537 raise llfuse.FUSEError(errno.ENOENT)
539 if not isinstance(p, Directory):
540 raise llfuse.FUSEError(errno.ENOTDIR)
542 fh = self._filehandles_counter
543 self._filehandles_counter += 1
544 if p.parent_inode in self.inodes:
545 parent = self.inodes[p.parent_inode]
547 raise llfuse.FUSEError(errno.EIO)
549 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
552 def readdir(self, fh, off):
553 logging.debug("arv-mount readdir: fh %i off %i", fh, off)
555 if fh in self._filehandles:
556 handle = self._filehandles[fh]
558 raise llfuse.FUSEError(errno.EBADF)
560 logging.debug("arv-mount handle.entry %s", handle.entry)
563 while e < len(handle.entry):
564 if handle.entry[e][1].inode in self.inodes:
565 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
568 def releasedir(self, fh):
569 del self._filehandles[fh]
572 st = llfuse.StatvfsData()
573 st.f_bsize = 1024 * 1024
586 # The llfuse documentation recommends only overloading functions that
587 # are actually implemented, as the default implementation will raise ENOSYS.
588 # However, there is a bug in the llfuse default implementation of create()
589 # "create() takes exactly 5 positional arguments (6 given)" which will crash
591 # The workaround is to implement it with the proper number of parameters,
592 # and then everything works out.
593 def create(self, p1, p2, p3, p4, p5):
594 raise llfuse.FUSEError(errno.EROFS)