2 # FUSE driver for Arvados Keep
20 from llfuse import FUSEError
22 class FreshBase(object):
23 '''Base class for maintaining fresh/stale state to determine when to update.'''
27 self._last_update = time()
30 # Mark the value as stale
34 # Test if the entries dict is stale
39 return (self._last_update + self._poll_time) < time()
44 self._last_update = time()
47 class File(FreshBase):
48 '''Base for file objects.'''
50 def __init__(self, parent_inode):
51 super(File, self).__init__()
53 self.parent_inode = parent_inode
58 def readfrom(self, off, size):
62 class StreamReaderFile(File):
63 '''Wraps a StreamFileReader as a file.'''
65 def __init__(self, parent_inode, reader):
66 super(StreamReaderFile, self).__init__(parent_inode)
70 return self.reader.size()
72 def readfrom(self, off, size):
73 return self.reader.readfrom(off, size)
79 class ObjectFile(File):
80 '''Wraps a dict as a serialized json object.'''
82 def __init__(self, parent_inode, contents):
83 super(ObjectFile, self).__init__(parent_inode)
84 self.contentsdict = contents
85 self.uuid = self.contentsdict['uuid']
86 self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
89 return len(self.contents)
91 def readfrom(self, off, size):
92 return self.contents[off:(off+size)]
95 class Directory(FreshBase):
96 '''Generic directory object, backed by a dict.
97 Consists of a set of entries with the key representing the filename
98 and the value referencing a File or Directory object.
101 def __init__(self, parent_inode):
102 super(Directory, self).__init__()
104 '''parent_inode is the integer inode number'''
106 if not isinstance(parent_inode, int):
107 raise Exception("parent_inode should be an int")
108 self.parent_inode = parent_inode
111 # Overriden by subclasses to implement logic to update the entries dict
112 # when the directory is stale
116 # Only used when computing the size of the disk footprint of the directory
121 def checkupdate(self):
125 except apiclient.errors.HttpError as e:
128 def __getitem__(self, item):
130 return self._entries[item]
134 return self._entries.items()
138 return self._entries.iterkeys()
140 def __contains__(self, k):
142 return k in self._entries
144 def merge(self, items, fn, same, new_entry):
145 '''Helper method for updating the contents of the directory.
147 items: array with new directory contents
149 fn: function to take an entry in 'items' and return the desired file or
152 same: function to compare an existing entry with an entry in the items
153 list to determine whether to keep the existing entry.
155 new_entry: function to create a new directory entry from array entry.
158 oldentries = self._entries
162 if n in oldentries and same(oldentries[n], i):
163 self._entries[n] = oldentries[n]
166 self._entries[n] = self.inodes.add_entry(new_entry(i))
168 llfuse.invalidate_entry(self.inode, str(n))
169 self.inodes.del_entry(oldentries[n])
173 class CollectionDirectory(Directory):
174 '''Represents the root of a directory tree holding a collection.'''
176 def __init__(self, parent_inode, inodes, collection_locator):
177 super(CollectionDirectory, self).__init__(parent_inode)
179 self.collection_locator = collection_locator
182 return i['uuid'] == self.collection_locator
185 collection = arvados.CollectionReader(arvados.Keep.get(self.collection_locator))
186 for s in collection.all_streams():
188 for part in s.name().split('/'):
189 if part != '' and part != '.':
190 if part not in cwd._entries:
191 cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
192 cwd = cwd._entries[part]
193 for k, v in s.files().items():
194 cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
198 class MagicDirectory(Directory):
199 '''A special directory that logically contains the set of all extant keep
200 locators. When a file is referenced by lookup(), it is tested to see if it
201 is a valid keep locator to a manifest, and if so, loads the manifest
202 contents as a subdirectory of this directory with the locator as the
203 directory name. Since querying a list of all extant keep locators is
204 impractical, only collections that have already been accessed are visible
208 def __init__(self, parent_inode, inodes):
209 super(MagicDirectory, self).__init__(parent_inode)
212 def __contains__(self, k):
213 if k in self._entries:
216 if arvados.Keep.get(k):
220 except Exception as e:
221 #print 'exception keep', e
224 def __getitem__(self, item):
225 if item not in self._entries:
226 self._entries[item] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, item))
227 return self._entries[item]
230 class TagsDirectory(Directory):
231 '''A special directory that contains as subdirectories all tags visible to the user.'''
233 def __init__(self, parent_inode, inodes, api, poll_time=60):
234 super(TagsDirectory, self).__init__(parent_inode)
238 arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
241 self._poll_time = poll_time
243 def invalidate(self):
245 super(TagsDirectory, self).invalidate()
246 for a in self._entries:
247 self._entries[a].invalidate()
250 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
251 self.merge(tags['items'],
253 lambda a, i: a.tag == i,
254 lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
256 class TagDirectory(Directory):
257 '''A special directory that contains as subdirectories all collections visible
258 to the user that are tagged with a particular tag.
261 def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
262 super(TagDirectory, self).__init__(parent_inode)
267 self._poll_time = poll_time
270 taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
271 ['name', '=', self.tag],
272 ['head_uuid', 'is_a', 'arvados#collection']],
273 select=['head_uuid']).execute()
274 self.merge(taggedcollections['items'],
275 lambda i: i['head_uuid'],
276 lambda a, i: a.collection_locator == i['head_uuid'],
277 lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
280 class GroupsDirectory(Directory):
281 '''A special directory that contains as subdirectories all groups visible to the user.'''
283 def __init__(self, parent_inode, inodes, api, poll_time=60):
284 super(GroupsDirectory, self).__init__(parent_inode)
288 arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
291 self._poll_time = poll_time
293 def invalidate(self):
295 super(GroupsDirectory, self).invalidate()
296 for a in self._entries:
297 self._entries[a].invalidate()
300 groups = self.api.groups().list().execute()
301 self.merge(groups['items'],
303 lambda a, i: a.uuid == i['uuid'],
304 lambda i: GroupDirectory(self.inode, self.inodes, self.api, i, poll=self._poll, poll_time=self._poll_time))
307 class GroupDirectory(Directory):
308 '''A special directory that contains the contents of a group.'''
310 def __init__(self, parent_inode, inodes, api, uuid, poll=False, poll_time=60):
311 super(GroupDirectory, self).__init__(parent_inode)
314 self.uuid = uuid['uuid']
316 self._poll_time = poll_time
318 def invalidate(self):
320 super(GroupDirectory, self).invalidate()
321 for a in self._entries:
322 self._entries[a].invalidate()
324 def createDirectory(self, i):
325 if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
326 return CollectionDirectory(self.inode, self.inodes, i['uuid'])
327 elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
328 return GroupDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
329 elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
330 return ObjectFile(self.parent_inode, i)
334 contents = self.api.groups().contents(uuid=self.uuid, include_linked=True).execute()
336 for a in contents['links']:
337 links[a['head_uuid']] = a['name']
340 if i['uuid'] in links:
341 return links[i['uuid']]
346 if isinstance(a, CollectionDirectory):
347 return a.collection_locator == i['uuid']
348 elif isinstance(a, ObjectFile):
349 return a.uuid == i['uuid'] and not a.stale()
352 self.merge(contents['items'],
355 self.createDirectory)
358 class FileHandle(object):
359 '''Connects a numeric file handle to a File or Directory object that has
360 been opened by the client.'''
362 def __init__(self, fh, entry):
367 class Inodes(object):
368 '''Manage the set of inodes. This is the mapping from a numeric id
369 to a concrete File or Directory object'''
373 self._counter = llfuse.ROOT_INODE
375 def __getitem__(self, item):
376 return self._entries[item]
378 def __setitem__(self, key, item):
379 self._entries[key] = item
382 return self._entries.iterkeys()
385 return self._entries.items()
387 def __contains__(self, k):
388 return k in self._entries
390 def add_entry(self, entry):
391 entry.inode = self._counter
392 self._entries[entry.inode] = entry
396 def del_entry(self, entry):
397 llfuse.invalidate_inode(entry.inode)
398 del self._entries[entry.inode]
400 class Operations(llfuse.Operations):
401 '''This is the main interface with llfuse. The methods on this object are
402 called by llfuse threads to service FUSE events to query and read from
405 llfuse has its own global lock which is acquired before calling a request handler,
406 so request handlers do not run concurrently unless the lock is explicitly released
407 with llfuse.lock_released.'''
409 def __init__(self, uid, gid):
410 super(Operations, self).__init__()
412 self.inodes = Inodes()
416 # dict of inode to filehandle
417 self._filehandles = {}
418 self._filehandles_counter = 1
420 # Other threads that need to wait until the fuse driver
421 # is fully initialized should wait() on this event object.
422 self.initlock = threading.Event()
425 # Allow threads that are waiting for the driver to be finished
426 # initializing to continue
429 def access(self, inode, mode, ctx):
432 def getattr(self, inode):
433 if inode not in self.inodes:
434 raise llfuse.FUSEError(errno.ENOENT)
436 e = self.inodes[inode]
438 entry = llfuse.EntryAttributes()
441 entry.entry_timeout = 300
442 entry.attr_timeout = 300
444 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
445 if isinstance(e, Directory):
446 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
448 entry.st_mode |= stat.S_IFREG
451 entry.st_uid = self.uid
452 entry.st_gid = self.gid
455 entry.st_size = e.size()
457 entry.st_blksize = 1024
458 entry.st_blocks = e.size()/1024
459 if e.size()/1024 != 0:
467 def lookup(self, parent_inode, name):
468 #print "lookup: parent_inode", parent_inode, "name", name
474 if parent_inode in self.inodes:
475 p = self.inodes[parent_inode]
477 inode = p.parent_inode
479 inode = p[name].inode
482 return self.getattr(inode)
484 raise llfuse.FUSEError(errno.ENOENT)
486 def open(self, inode, flags):
487 if inode in self.inodes:
488 p = self.inodes[inode]
490 raise llfuse.FUSEError(errno.ENOENT)
492 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
493 raise llfuse.FUSEError(errno.EROFS)
495 if isinstance(p, Directory):
496 raise llfuse.FUSEError(errno.EISDIR)
498 fh = self._filehandles_counter
499 self._filehandles_counter += 1
500 self._filehandles[fh] = FileHandle(fh, p)
503 def read(self, fh, off, size):
504 #print "read", fh, off, size
505 if fh in self._filehandles:
506 handle = self._filehandles[fh]
508 raise llfuse.FUSEError(errno.EBADF)
511 with llfuse.lock_released:
512 return handle.entry.readfrom(off, size)
514 raise llfuse.FUSEError(errno.EIO)
516 def release(self, fh):
517 if fh in self._filehandles:
518 del self._filehandles[fh]
520 def opendir(self, inode):
521 #print "opendir: inode", inode
523 if inode in self.inodes:
524 p = self.inodes[inode]
526 raise llfuse.FUSEError(errno.ENOENT)
528 if not isinstance(p, Directory):
529 raise llfuse.FUSEError(errno.ENOTDIR)
531 fh = self._filehandles_counter
532 self._filehandles_counter += 1
533 if p.parent_inode in self.inodes:
534 parent = self.inodes[p.parent_inode]
536 raise llfuse.FUSEError(errno.EIO)
538 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
541 def readdir(self, fh, off):
542 #print "readdir: fh", fh, "off", off
544 if fh in self._filehandles:
545 handle = self._filehandles[fh]
547 raise llfuse.FUSEError(errno.EBADF)
549 #print "handle.entry", handle.entry
552 while e < len(handle.entry):
553 if handle.entry[e][1].inode in self.inodes:
554 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
557 def releasedir(self, fh):
558 del self._filehandles[fh]
561 st = llfuse.StatvfsData()
562 st.f_bsize = 1024 * 1024
575 # The llfuse documentation recommends only overloading functions that
576 # are actually implemented, as the default implementation will raise ENOSYS.
577 # However, there is a bug in the llfuse default implementation of create()
578 # "create() takes exactly 5 positional arguments (6 given)" which will crash
580 # The workaround is to implement it with the proper number of parameters,
581 # and then everything works out.
582 def create(self, p1, p2, p3, p4, p5):
583 raise llfuse.FUSEError(errno.EROFS)