2 # FUSE driver for Arvados Keep
20 from llfuse import FUSEError
22 class FreshBase(object):
23 '''Base class for maintaining fresh/stale state to determine when to update.'''
27 self._last_update = time()
30 # Mark the value as stale
34 # Test if the entries dict is stale
39 return (self._last_update + self._poll_time) < time()
44 self._last_update = time()
47 class File(FreshBase):
48 '''Base for file objects.'''
50 def __init__(self, parent_inode):
51 super(File, self).__init__()
53 self.parent_inode = parent_inode
58 def readfrom(self, off, size):
62 class StreamReaderFile(File):
63 '''Wraps a StreamFileReader as a file.'''
65 def __init__(self, parent_inode, reader):
66 super(StreamReaderFile, self).__init__(parent_inode)
70 return self.reader.size()
72 def readfrom(self, off, size):
73 return self.reader.readfrom(off, size)
79 class ObjectFile(File):
80 '''Wraps a dict as a serialized json object.'''
82 def __init__(self, parent_inode, contents):
83 super(ObjectFile, self).__init__(parent_inode)
84 self.contentsdict = contents
85 self.uuid = self.contentsdict['uuid']
86 self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
89 return len(self.contents)
91 def readfrom(self, off, size):
92 return self.contents[off:(off+size)]
95 class Directory(FreshBase):
96 '''Generic directory object, backed by a dict.
97 Consists of a set of entries with the key representing the filename
98 and the value referencing a File or Directory object.
101 def __init__(self, parent_inode):
102 super(Directory, self).__init__()
104 '''parent_inode is the integer inode number'''
106 if not isinstance(parent_inode, int):
107 raise Exception("parent_inode should be an int")
108 self.parent_inode = parent_inode
111 # Overriden by subclasses to implement logic to update the entries dict
112 # when the directory is stale
116 # Only used when computing the size of the disk footprint of the directory
121 def checkupdate(self):
125 except apiclient.errors.HttpError as e:
128 def __getitem__(self, item):
130 return self._entries[item]
134 return self._entries.items()
138 return self._entries.iterkeys()
140 def __contains__(self, k):
142 return k in self._entries
144 def merge(self, items, fn, same, new_entry):
145 '''Helper method for updating the contents of the directory.
147 items: array with new directory contents
149 fn: function to take an entry in 'items' and return the desired file or
152 same: function to compare an existing entry with an entry in the items
153 list to determine whether to keep the existing entry.
155 new_entry: function to create a new directory entry from array entry.
158 oldentries = self._entries
162 if n in oldentries and same(oldentries[n], i):
163 self._entries[n] = oldentries[n]
166 self._entries[n] = self.inodes.add_entry(new_entry(i))
168 llfuse.invalidate_entry(self.inode, str(n))
169 self.inodes.del_entry(oldentries[n])
173 class CollectionDirectory(Directory):
174 '''Represents the root of a directory tree holding a collection.'''
176 def __init__(self, parent_inode, inodes, collection_locator):
177 super(CollectionDirectory, self).__init__(parent_inode)
179 self.collection_locator = collection_locator
182 return i['uuid'] == self.collection_locator
186 collection = arvados.CollectionReader(self.collection_locator)
187 for s in collection.all_streams():
189 for part in s.name().split('/'):
190 if part != '' and part != '.':
191 if part not in cwd._entries:
192 cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
193 cwd = cwd._entries[part]
194 for k, v in s.files().items():
195 cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
198 except Exception as detail:
199 print("%s: error: %s" % (self.collection_locator,detail) )
201 class MagicDirectory(Directory):
202 '''A special directory that logically contains the set of all extant keep
203 locators. When a file is referenced by lookup(), it is tested to see if it
204 is a valid keep locator to a manifest, and if so, loads the manifest
205 contents as a subdirectory of this directory with the locator as the
206 directory name. Since querying a list of all extant keep locators is
207 impractical, only collections that have already been accessed are visible
211 def __init__(self, parent_inode, inodes):
212 super(MagicDirectory, self).__init__(parent_inode)
215 def __contains__(self, k):
216 if k in self._entries:
219 if arvados.Keep.get(k):
223 except Exception as e:
224 #print 'exception keep', e
227 def __getitem__(self, item):
228 if item not in self._entries:
229 self._entries[item] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, item))
230 return self._entries[item]
233 class TagsDirectory(Directory):
234 '''A special directory that contains as subdirectories all tags visible to the user.'''
236 def __init__(self, parent_inode, inodes, api, poll_time=60):
237 super(TagsDirectory, self).__init__(parent_inode)
241 arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
244 self._poll_time = poll_time
246 def invalidate(self):
248 super(TagsDirectory, self).invalidate()
249 for a in self._entries:
250 self._entries[a].invalidate()
253 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
254 self.merge(tags['items'],
256 lambda a, i: a.tag == i,
257 lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
259 class TagDirectory(Directory):
260 '''A special directory that contains as subdirectories all collections visible
261 to the user that are tagged with a particular tag.
264 def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
265 super(TagDirectory, self).__init__(parent_inode)
270 self._poll_time = poll_time
273 taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
274 ['name', '=', self.tag],
275 ['head_uuid', 'is_a', 'arvados#collection']],
276 select=['head_uuid']).execute()
277 self.merge(taggedcollections['items'],
278 lambda i: i['head_uuid'],
279 lambda a, i: a.collection_locator == i['head_uuid'],
280 lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
283 class GroupsDirectory(Directory):
284 '''A special directory that contains as subdirectories all groups visible to the user.'''
286 def __init__(self, parent_inode, inodes, api, poll_time=60):
287 super(GroupsDirectory, self).__init__(parent_inode)
291 arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
294 self._poll_time = poll_time
296 def invalidate(self):
298 super(GroupsDirectory, self).invalidate()
299 for a in self._entries:
300 self._entries[a].invalidate()
303 groups = self.api.groups().list().execute()
304 self.merge(groups['items'],
306 lambda a, i: a.uuid == i['uuid'],
307 lambda i: GroupDirectory(self.inode, self.inodes, self.api, i, poll=self._poll, poll_time=self._poll_time))
310 class GroupDirectory(Directory):
311 '''A special directory that contains the contents of a group.'''
313 def __init__(self, parent_inode, inodes, api, uuid, poll=False, poll_time=60):
314 super(GroupDirectory, self).__init__(parent_inode)
317 self.uuid = uuid['uuid']
319 self._poll_time = poll_time
321 def invalidate(self):
323 super(GroupDirectory, self).invalidate()
324 for a in self._entries:
325 self._entries[a].invalidate()
327 def createDirectory(self, i):
328 if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
329 return CollectionDirectory(self.inode, self.inodes, i['uuid'])
330 elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
331 return GroupDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
332 elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
333 return ObjectFile(self.parent_inode, i)
337 contents = self.api.groups().contents(uuid=self.uuid, include_linked=True).execute()
339 for a in contents['links']:
340 links[a['head_uuid']] = a['name']
343 if i['uuid'] in links:
344 return links[i['uuid']]
349 if isinstance(a, CollectionDirectory):
350 return a.collection_locator == i['uuid']
351 elif isinstance(a, GroupDirectory):
352 return a.uuid == i['uuid']
353 elif isinstance(a, ObjectFile):
354 return a.uuid == i['uuid'] and not a.stale()
357 self.merge(contents['items'],
360 self.createDirectory)
363 class FileHandle(object):
364 '''Connects a numeric file handle to a File or Directory object that has
365 been opened by the client.'''
367 def __init__(self, fh, entry):
372 class Inodes(object):
373 '''Manage the set of inodes. This is the mapping from a numeric id
374 to a concrete File or Directory object'''
378 self._counter = llfuse.ROOT_INODE
380 def __getitem__(self, item):
381 return self._entries[item]
383 def __setitem__(self, key, item):
384 self._entries[key] = item
387 return self._entries.iterkeys()
390 return self._entries.items()
392 def __contains__(self, k):
393 return k in self._entries
395 def add_entry(self, entry):
396 entry.inode = self._counter
397 self._entries[entry.inode] = entry
401 def del_entry(self, entry):
402 llfuse.invalidate_inode(entry.inode)
403 del self._entries[entry.inode]
405 class Operations(llfuse.Operations):
406 '''This is the main interface with llfuse. The methods on this object are
407 called by llfuse threads to service FUSE events to query and read from
410 llfuse has its own global lock which is acquired before calling a request handler,
411 so request handlers do not run concurrently unless the lock is explicitly released
412 with llfuse.lock_released.'''
414 def __init__(self, uid, gid):
415 super(Operations, self).__init__()
417 self.inodes = Inodes()
421 # dict of inode to filehandle
422 self._filehandles = {}
423 self._filehandles_counter = 1
425 # Other threads that need to wait until the fuse driver
426 # is fully initialized should wait() on this event object.
427 self.initlock = threading.Event()
430 # Allow threads that are waiting for the driver to be finished
431 # initializing to continue
434 def access(self, inode, mode, ctx):
437 def getattr(self, inode):
438 if inode not in self.inodes:
439 raise llfuse.FUSEError(errno.ENOENT)
441 e = self.inodes[inode]
443 entry = llfuse.EntryAttributes()
446 entry.entry_timeout = 300
447 entry.attr_timeout = 300
449 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
450 if isinstance(e, Directory):
451 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
453 entry.st_mode |= stat.S_IFREG
456 entry.st_uid = self.uid
457 entry.st_gid = self.gid
460 entry.st_size = e.size()
462 entry.st_blksize = 1024
463 entry.st_blocks = e.size()/1024
464 if e.size()/1024 != 0:
472 def lookup(self, parent_inode, name):
473 #print "lookup: parent_inode", parent_inode, "name", name
479 if parent_inode in self.inodes:
480 p = self.inodes[parent_inode]
482 inode = p.parent_inode
484 inode = p[name].inode
487 return self.getattr(inode)
489 raise llfuse.FUSEError(errno.ENOENT)
491 def open(self, inode, flags):
492 if inode in self.inodes:
493 p = self.inodes[inode]
495 raise llfuse.FUSEError(errno.ENOENT)
497 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
498 raise llfuse.FUSEError(errno.EROFS)
500 if isinstance(p, Directory):
501 raise llfuse.FUSEError(errno.EISDIR)
503 fh = self._filehandles_counter
504 self._filehandles_counter += 1
505 self._filehandles[fh] = FileHandle(fh, p)
508 def read(self, fh, off, size):
509 #print "read", fh, off, size
510 if fh in self._filehandles:
511 handle = self._filehandles[fh]
513 raise llfuse.FUSEError(errno.EBADF)
516 with llfuse.lock_released:
517 return handle.entry.readfrom(off, size)
519 raise llfuse.FUSEError(errno.EIO)
521 def release(self, fh):
522 if fh in self._filehandles:
523 del self._filehandles[fh]
525 def opendir(self, inode):
526 #print "opendir: inode", inode
528 if inode in self.inodes:
529 p = self.inodes[inode]
531 raise llfuse.FUSEError(errno.ENOENT)
533 if not isinstance(p, Directory):
534 raise llfuse.FUSEError(errno.ENOTDIR)
536 fh = self._filehandles_counter
537 self._filehandles_counter += 1
538 if p.parent_inode in self.inodes:
539 parent = self.inodes[p.parent_inode]
541 raise llfuse.FUSEError(errno.EIO)
543 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
546 def readdir(self, fh, off):
547 #print "readdir: fh", fh, "off", off
549 if fh in self._filehandles:
550 handle = self._filehandles[fh]
552 raise llfuse.FUSEError(errno.EBADF)
554 #print "handle.entry", handle.entry
557 while e < len(handle.entry):
558 if handle.entry[e][1].inode in self.inodes:
559 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
562 def releasedir(self, fh):
563 del self._filehandles[fh]
566 st = llfuse.StatvfsData()
567 st.f_bsize = 1024 * 1024
580 # The llfuse documentation recommends only overloading functions that
581 # are actually implemented, as the default implementation will raise ENOSYS.
582 # However, there is a bug in the llfuse default implementation of create()
583 # "create() takes exactly 5 positional arguments (6 given)" which will crash
585 # The workaround is to implement it with the proper number of parameters,
586 # and then everything works out.
587 def create(self, p1, p2, p3, p4, p5):
588 raise llfuse.FUSEError(errno.EROFS)