2 # FUSE driver for Arvados Keep
20 from llfuse import FUSEError
22 class FreshBase(object):
23 '''Base class for maintaining fresh/stale state to determine when to update.'''
27 self._last_update = time()
30 # Mark the value as stale
34 # Test if the entries dict is stale
39 return (self._last_update + self._poll_time) < time()
44 self._last_update = time()
47 class File(FreshBase):
48 '''Base for file objects.'''
50 def __init__(self, parent_inode):
51 super(File, self).__init__()
53 self.parent_inode = parent_inode
58 def readfrom(self, off, size):
62 class StreamReaderFile(File):
63 '''Wraps a StreamFileReader as a file.'''
65 def __init__(self, parent_inode, reader):
66 super(StreamReaderFile, self).__init__(parent_inode)
70 return self.reader.size()
72 def readfrom(self, off, size):
73 return self.reader.readfrom(off, size)
79 class ObjectFile(File):
80 '''Wraps a dict as a serialized json object.'''
82 def __init__(self, parent_inode, contents):
83 super(ObjectFile, self).__init__(parent_inode)
84 self.contentsdict = contents
85 self.uuid = self.contentsdict['uuid']
86 self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
89 return len(self.contents)
91 def readfrom(self, off, size):
92 return self.contents[off:(off+size)]
95 class Directory(FreshBase):
96 '''Generic directory object, backed by a dict.
97 Consists of a set of entries with the key representing the filename
98 and the value referencing a File or Directory object.
101 def __init__(self, parent_inode):
102 super(Directory, self).__init__()
104 '''parent_inode is the integer inode number'''
106 if not isinstance(parent_inode, int):
107 raise Exception("parent_inode should be an int")
108 self.parent_inode = parent_inode
111 # Overriden by subclasses to implement logic to update the entries dict
112 # when the directory is stale
116 # Only used when computing the size of the disk footprint of the directory
121 def checkupdate(self):
125 except apiclient.errors.HttpError as e:
128 def __getitem__(self, item):
130 return self._entries[item]
134 return self._entries.items()
138 return self._entries.iterkeys()
140 def __contains__(self, k):
142 return k in self._entries
144 def merge(self, items, fn, same, new_entry):
145 '''Helper method for updating the contents of the directory.
147 items: array with new directory contents
149 fn: function to take an entry in 'items' and return the desired file or
152 same: function to compare an existing entry with an entry in the items
153 list to determine whether to keep the existing entry.
155 new_entry: function to create a new directory entry from array entry.
158 oldentries = self._entries
162 if n in oldentries and same(oldentries[n], i):
163 self._entries[n] = oldentries[n]
166 self._entries[n] = self.inodes.add_entry(new_entry(i))
168 llfuse.invalidate_entry(self.inode, str(n))
169 self.inodes.del_entry(oldentries[n])
173 class CollectionDirectory(Directory):
174 '''Represents the root of a directory tree holding a collection.'''
176 def __init__(self, parent_inode, inodes, collection_locator):
177 super(CollectionDirectory, self).__init__(parent_inode)
179 self.collection_locator = collection_locator
182 return i['uuid'] == self.collection_locator
185 collection = arvados.CollectionReader(arvados.Keep.get(self.collection_locator))
186 for s in collection.all_streams():
188 for part in s.name().split('/'):
189 if part != '' and part != '.':
190 if part not in cwd._entries:
191 cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
192 cwd = cwd._entries[part]
193 for k, v in s.files().items():
194 cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
198 class MagicDirectory(Directory):
199 '''A special directory that logically contains the set of all extant keep
200 locators. When a file is referenced by lookup(), it is tested to see if it
201 is a valid keep locator to a manifest, and if so, loads the manifest
202 contents as a subdirectory of this directory with the locator as the
203 directory name. Since querying a list of all extant keep locators is
204 impractical, only collections that have already been accessed are visible
208 def __init__(self, parent_inode, inodes):
209 super(MagicDirectory, self).__init__(parent_inode)
212 def __contains__(self, k):
213 if k in self._entries:
216 if arvados.Keep.get(k):
220 except Exception as e:
221 #print 'exception keep', e
224 def __getitem__(self, item):
225 if item not in self._entries:
226 self._entries[item] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, item))
227 return self._entries[item]
230 class TagsDirectory(Directory):
231 '''A special directory that contains as subdirectories all tags visible to the user.'''
233 def __init__(self, parent_inode, inodes, api, poll_time=60):
234 super(TagsDirectory, self).__init__(parent_inode)
238 arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
241 self._poll_time = poll_time
243 def invalidate(self):
245 super(TagsDirectory, self).invalidate()
246 for a in self._entries:
247 self._entries[a].invalidate()
250 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
251 self.merge(tags['items'],
253 lambda a, i: a.tag == i,
254 lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
256 class TagDirectory(Directory):
257 '''A special directory that contains as subdirectories all collections visible
258 to the user that are tagged with a particular tag.
261 def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
262 super(TagDirectory, self).__init__(parent_inode)
267 self._poll_time = poll_time
270 taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
271 ['name', '=', self.tag],
272 ['head_uuid', 'is_a', 'arvados#collection']],
273 select=['head_uuid']).execute()
274 self.merge(taggedcollections['items'],
275 lambda i: i['head_uuid'],
276 lambda a, i: a.collection_locator == i['head_uuid'],
277 lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
280 class GroupsDirectory(Directory):
281 '''A special directory that contains as subdirectories all groups visible to the user.'''
283 def __init__(self, parent_inode, inodes, api, poll_time=60):
284 super(GroupsDirectory, self).__init__(parent_inode)
288 arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
291 self._poll_time = poll_time
293 def invalidate(self):
295 super(GroupsDirectory, self).invalidate()
296 for a in self._entries:
297 self._entries[a].invalidate()
300 groups = self.api.groups().list().execute()
301 self.merge(groups['items'],
303 lambda a, i: a.uuid == i['uuid'],
304 lambda i: GroupDirectory(self.inode, self.inodes, self.api, i, poll=self._poll, poll_time=self._poll_time))
307 class GroupDirectory(Directory):
308 '''A special directory that contains the contents of a group.'''
310 def __init__(self, parent_inode, inodes, api, uuid, poll=False, poll_time=60):
311 super(GroupDirectory, self).__init__(parent_inode)
314 self.uuid = uuid['uuid']
316 self._poll_time = poll_time
318 def invalidate(self):
320 super(GroupDirectory, self).invalidate()
321 for a in self._entries:
322 self._entries[a].invalidate()
324 def createDirectory(self, i):
325 if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
326 return CollectionDirectory(self.inode, self.inodes, i['uuid'])
327 elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
328 return GroupDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
329 elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
330 return ObjectFile(self.parent_inode, i)
334 contents = self.api.groups().contents(uuid=self.uuid, include_linked=True).execute()
336 for a in contents['links']:
337 links[a['head_uuid']] = a['name']
340 if i['uuid'] in links:
341 return links[i['uuid']]
346 if isinstance(a, CollectionDirectory):
347 return a.collection_locator == i['uuid']
348 elif isinstance(a, GroupDirectory):
349 return a.uuid == i['uuid']
350 elif isinstance(a, ObjectFile):
351 return a.uuid == i['uuid'] and not a.stale()
354 self.merge(contents['items'],
357 self.createDirectory)
360 class FileHandle(object):
361 '''Connects a numeric file handle to a File or Directory object that has
362 been opened by the client.'''
364 def __init__(self, fh, entry):
369 class Inodes(object):
370 '''Manage the set of inodes. This is the mapping from a numeric id
371 to a concrete File or Directory object'''
375 self._counter = llfuse.ROOT_INODE
377 def __getitem__(self, item):
378 return self._entries[item]
380 def __setitem__(self, key, item):
381 self._entries[key] = item
384 return self._entries.iterkeys()
387 return self._entries.items()
389 def __contains__(self, k):
390 return k in self._entries
392 def add_entry(self, entry):
393 entry.inode = self._counter
394 self._entries[entry.inode] = entry
398 def del_entry(self, entry):
399 llfuse.invalidate_inode(entry.inode)
400 del self._entries[entry.inode]
402 class Operations(llfuse.Operations):
403 '''This is the main interface with llfuse. The methods on this object are
404 called by llfuse threads to service FUSE events to query and read from
407 llfuse has its own global lock which is acquired before calling a request handler,
408 so request handlers do not run concurrently unless the lock is explicitly released
409 with llfuse.lock_released.'''
411 def __init__(self, uid, gid):
412 super(Operations, self).__init__()
414 self.inodes = Inodes()
418 # dict of inode to filehandle
419 self._filehandles = {}
420 self._filehandles_counter = 1
422 # Other threads that need to wait until the fuse driver
423 # is fully initialized should wait() on this event object.
424 self.initlock = threading.Event()
427 # Allow threads that are waiting for the driver to be finished
428 # initializing to continue
431 def access(self, inode, mode, ctx):
434 def getattr(self, inode):
435 if inode not in self.inodes:
436 raise llfuse.FUSEError(errno.ENOENT)
438 e = self.inodes[inode]
440 entry = llfuse.EntryAttributes()
443 entry.entry_timeout = 300
444 entry.attr_timeout = 300
446 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
447 if isinstance(e, Directory):
448 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
450 entry.st_mode |= stat.S_IFREG
453 entry.st_uid = self.uid
454 entry.st_gid = self.gid
457 entry.st_size = e.size()
459 entry.st_blksize = 1024
460 entry.st_blocks = e.size()/1024
461 if e.size()/1024 != 0:
469 def lookup(self, parent_inode, name):
470 #print "lookup: parent_inode", parent_inode, "name", name
476 if parent_inode in self.inodes:
477 p = self.inodes[parent_inode]
479 inode = p.parent_inode
481 inode = p[name].inode
484 return self.getattr(inode)
486 raise llfuse.FUSEError(errno.ENOENT)
488 def open(self, inode, flags):
489 if inode in self.inodes:
490 p = self.inodes[inode]
492 raise llfuse.FUSEError(errno.ENOENT)
494 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
495 raise llfuse.FUSEError(errno.EROFS)
497 if isinstance(p, Directory):
498 raise llfuse.FUSEError(errno.EISDIR)
500 fh = self._filehandles_counter
501 self._filehandles_counter += 1
502 self._filehandles[fh] = FileHandle(fh, p)
505 def read(self, fh, off, size):
506 #print "read", fh, off, size
507 if fh in self._filehandles:
508 handle = self._filehandles[fh]
510 raise llfuse.FUSEError(errno.EBADF)
513 with llfuse.lock_released:
514 return handle.entry.readfrom(off, size)
516 raise llfuse.FUSEError(errno.EIO)
518 def release(self, fh):
519 if fh in self._filehandles:
520 del self._filehandles[fh]
522 def opendir(self, inode):
523 #print "opendir: inode", inode
525 if inode in self.inodes:
526 p = self.inodes[inode]
528 raise llfuse.FUSEError(errno.ENOENT)
530 if not isinstance(p, Directory):
531 raise llfuse.FUSEError(errno.ENOTDIR)
533 fh = self._filehandles_counter
534 self._filehandles_counter += 1
535 if p.parent_inode in self.inodes:
536 parent = self.inodes[p.parent_inode]
538 raise llfuse.FUSEError(errno.EIO)
540 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
543 def readdir(self, fh, off):
544 #print "readdir: fh", fh, "off", off
546 if fh in self._filehandles:
547 handle = self._filehandles[fh]
549 raise llfuse.FUSEError(errno.EBADF)
551 #print "handle.entry", handle.entry
554 while e < len(handle.entry):
555 if handle.entry[e][1].inode in self.inodes:
556 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
559 def releasedir(self, fh):
560 del self._filehandles[fh]
563 st = llfuse.StatvfsData()
564 st.f_bsize = 1024 * 1024
577 # The llfuse documentation recommends only overloading functions that
578 # are actually implemented, as the default implementation will raise ENOSYS.
579 # However, there is a bug in the llfuse default implementation of create()
580 # "create() takes exactly 5 positional arguments (6 given)" which will crash
582 # The workaround is to implement it with the proper number of parameters,
583 # and then everything works out.
584 def create(self, p1, p2, p3, p4, p5):
585 raise llfuse.FUSEError(errno.EROFS)