2 # FUSE driver for Arvados Keep
20 _logger = logging.getLogger('arvados.arvados_fuse')
23 from llfuse import FUSEError
25 class FreshBase(object):
26 '''Base class for maintaining fresh/stale state to determine when to update.'''
30 self._last_update = time()
33 # Mark the value as stale
37 # Test if the entries dict is stale
42 return (self._last_update + self._poll_time) < time()
47 self._last_update = time()
56 class File(FreshBase):
57 '''Base for file objects.'''
59 def __init__(self, parent_inode):
60 super(File, self).__init__()
62 self.parent_inode = parent_inode
67 def readfrom(self, off, size):
71 class StreamReaderFile(File):
72 '''Wraps a StreamFileReader as a file.'''
74 def __init__(self, parent_inode, reader, collection):
75 super(StreamReaderFile, self).__init__(parent_inode)
77 self.collection = collection
80 return self.reader.size()
82 def readfrom(self, off, size):
83 return self.reader.readfrom(off, size)
89 return collection["created_at"]
92 return collection["modified_at"]
95 class ObjectFile(File):
96 '''Wraps a dict as a serialized json object.'''
98 def __init__(self, parent_inode, contents):
99 super(ObjectFile, self).__init__(parent_inode)
100 self.contentsdict = contents
101 self.uuid = self.contentsdict['uuid']
102 self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
105 return len(self.contents)
107 def readfrom(self, off, size):
108 return self.contents[off:(off+size)]
111 class Directory(FreshBase):
112 '''Generic directory object, backed by a dict.
113 Consists of a set of entries with the key representing the filename
114 and the value referencing a File or Directory object.
117 def __init__(self, parent_inode):
118 super(Directory, self).__init__()
120 '''parent_inode is the integer inode number'''
122 if not isinstance(parent_inode, int):
123 raise Exception("parent_inode should be an int")
124 self.parent_inode = parent_inode
127 # Overriden by subclasses to implement logic to update the entries dict
128 # when the directory is stale
132 # Only used when computing the size of the disk footprint of the directory
137 def checkupdate(self):
141 except apiclient.errors.HttpError as e:
144 def __getitem__(self, item):
146 return self._entries[item]
150 return self._entries.items()
154 return self._entries.iterkeys()
156 def __contains__(self, k):
158 return k in self._entries
160 def merge(self, items, fn, same, new_entry):
161 '''Helper method for updating the contents of the directory.
163 items: array with new directory contents
165 fn: function to take an entry in 'items' and return the desired file or
168 same: function to compare an existing entry with an entry in the items
169 list to determine whether to keep the existing entry.
171 new_entry: function to create a new directory entry from array entry.
174 oldentries = self._entries
178 if n in oldentries and same(oldentries[n], i):
179 self._entries[n] = oldentries[n]
184 self._entries[n] = self.inodes.add_entry(ent)
186 llfuse.invalidate_entry(self.inode, str(n))
187 self.inodes.del_entry(oldentries[n])
191 class CollectionDirectory(Directory):
192 '''Represents the root of a directory tree holding a collection.'''
194 def __init__(self, parent_inode, inodes, collection_locator):
195 super(CollectionDirectory, self).__init__(parent_inode)
197 self.collection_locator = collection_locator
200 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
204 collection = arvados.CollectionReader(self.collection_locator)
205 for s in collection.all_streams():
207 for part in s.name().split('/'):
208 if part != '' and part != '.':
209 if part not in cwd._entries:
210 cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
211 cwd = cwd._entries[part]
212 for k, v in s.files().items():
213 cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
216 except Exception as detail:
217 _logger.debug("arv-mount %s: error: %s",
218 self.collection_locator, detail)
221 class MagicDirectory(Directory):
222 '''A special directory that logically contains the set of all extant keep
223 locators. When a file is referenced by lookup(), it is tested to see if it
224 is a valid keep locator to a manifest, and if so, loads the manifest
225 contents as a subdirectory of this directory with the locator as the
226 directory name. Since querying a list of all extant keep locators is
227 impractical, only collections that have already been accessed are visible
231 def __init__(self, parent_inode, inodes):
232 super(MagicDirectory, self).__init__(parent_inode)
235 def __contains__(self, k):
236 if k in self._entries:
239 e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, k))
245 except Exception as e:
246 _logger.debug('arv-mount exception keep %s', e)
249 def __getitem__(self, item):
251 return self._entries[item]
253 raise KeyError("No collection with id " + item)
255 class RecursiveInvalidateDirectory(Directory):
256 def invalidate(self):
258 if self.parent_inode == llfuse.ROOT_INODE:
259 llfuse.lock.acquire()
260 super(RecursiveInvalidateDirectory, self).invalidate()
261 for a in self._entries:
262 self._entries[a].invalidate()
264 if self.parent_inode == llfuse.ROOT_INODE:
265 llfuse.lock.release()
267 class TagsDirectory(RecursiveInvalidateDirectory):
268 '''A special directory that contains as subdirectories all tags visible to the user.'''
270 def __init__(self, parent_inode, inodes, api, poll_time=60):
271 super(TagsDirectory, self).__init__(parent_inode)
275 arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
278 self._poll_time = poll_time
281 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
283 self.merge(tags['items'],
284 lambda i: i['name'] if 'name' in i else i['uuid'],
285 lambda a, i: a.tag == i,
286 lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
288 class TagDirectory(Directory):
289 '''A special directory that contains as subdirectories all collections visible
290 to the user that are tagged with a particular tag.
293 def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
294 super(TagDirectory, self).__init__(parent_inode)
299 self._poll_time = poll_time
302 taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
303 ['name', '=', self.tag],
304 ['head_uuid', 'is_a', 'arvados#collection']],
305 select=['head_uuid']).execute()
306 self.merge(taggedcollections['items'],
307 lambda i: i['head_uuid'],
308 lambda a, i: a.collection_locator == i['head_uuid'],
309 lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
312 class ProjectDirectory(RecursiveInvalidateDirectory):
313 '''A special directory that contains the contents of a project.'''
315 def __init__(self, parent_inode, inodes, api, uuid, poll=False, poll_time=60):
316 super(ProjectDirectory, self).__init__(parent_inode)
319 self.uuid = uuid['uuid']
321 if parent_inode == llfuse.ROOT_INODE:
323 arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
326 self._poll_time = poll_time
329 self._poll_time = poll_time
332 def createDirectory(self, i):
333 if re.match(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}', i['uuid']) and i['name'] is not None:
334 return CollectionDirectory(self.inode, self.inodes, i['uuid'])
335 elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
336 return ProjectDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
337 #elif re.match(r'[a-z0-9]{5}-8i9sb-[a-z0-9]{15}', i['uuid']):
339 #elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
340 # return ObjectFile(self.parent_inode, i)
345 return arvados.util.all_contents(self.api, self.uuid)
349 if isinstance(a, CollectionDirectory):
350 return a.collection_locator == i['uuid']
351 elif isinstance(a, ProjectDirectory):
352 return a.uuid == i['uuid']
353 elif isinstance(a, ObjectFile):
354 return a.uuid == i['uuid'] and not a.stale()
357 self.merge(self.contents(),
358 lambda i: i['name'] if 'name' in i and i['name'] is not None and len(i['name']) > 0 else i['uuid'],
360 self.createDirectory)
363 class HomeDirectory(ProjectDirectory):
364 '''A special directory that represents the "home" project.'''
366 def __init__(self, parent_inode, inodes, api, poll=False, poll_time=60):
367 super(HomeDirectory, self).__init__(parent_inode, inodes, api, api.users().current().execute())
370 # return self.api.groups().contents(uuid=self.uuid).execute()['items']
372 class FileHandle(object):
373 '''Connects a numeric file handle to a File or Directory object that has
374 been opened by the client.'''
376 def __init__(self, fh, entry):
381 class Inodes(object):
382 '''Manage the set of inodes. This is the mapping from a numeric id
383 to a concrete File or Directory object'''
387 self._counter = llfuse.ROOT_INODE
389 def __getitem__(self, item):
390 return self._entries[item]
392 def __setitem__(self, key, item):
393 self._entries[key] = item
396 return self._entries.iterkeys()
399 return self._entries.items()
401 def __contains__(self, k):
402 return k in self._entries
404 def add_entry(self, entry):
405 entry.inode = self._counter
406 self._entries[entry.inode] = entry
410 def del_entry(self, entry):
411 llfuse.invalidate_inode(entry.inode)
412 del self._entries[entry.inode]
414 class Operations(llfuse.Operations):
415 '''This is the main interface with llfuse. The methods on this object are
416 called by llfuse threads to service FUSE events to query and read from
419 llfuse has its own global lock which is acquired before calling a request handler,
420 so request handlers do not run concurrently unless the lock is explicitly released
421 with llfuse.lock_released.'''
423 def __init__(self, uid, gid):
424 super(Operations, self).__init__()
426 self.inodes = Inodes()
430 # dict of inode to filehandle
431 self._filehandles = {}
432 self._filehandles_counter = 1
434 # Other threads that need to wait until the fuse driver
435 # is fully initialized should wait() on this event object.
436 self.initlock = threading.Event()
439 # Allow threads that are waiting for the driver to be finished
440 # initializing to continue
443 def access(self, inode, mode, ctx):
446 def getattr(self, inode):
447 if inode not in self.inodes:
448 raise llfuse.FUSEError(errno.ENOENT)
450 e = self.inodes[inode]
452 entry = llfuse.EntryAttributes()
455 entry.entry_timeout = 300
456 entry.attr_timeout = 300
458 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
459 if isinstance(e, Directory):
460 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
462 entry.st_mode |= stat.S_IFREG
465 entry.st_uid = self.uid
466 entry.st_gid = self.gid
469 entry.st_size = e.size()
471 entry.st_blksize = 1024
472 entry.st_blocks = e.size()/1024
473 if e.size()/1024 != 0:
481 def lookup(self, parent_inode, name):
482 _logger.debug("arv-mount lookup: parent_inode %i name %s",
489 if parent_inode in self.inodes:
490 p = self.inodes[parent_inode]
492 inode = p.parent_inode
494 inode = p[name].inode
497 return self.getattr(inode)
499 raise llfuse.FUSEError(errno.ENOENT)
501 def open(self, inode, flags):
502 if inode in self.inodes:
503 p = self.inodes[inode]
505 raise llfuse.FUSEError(errno.ENOENT)
507 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
508 raise llfuse.FUSEError(errno.EROFS)
510 if isinstance(p, Directory):
511 raise llfuse.FUSEError(errno.EISDIR)
513 fh = self._filehandles_counter
514 self._filehandles_counter += 1
515 self._filehandles[fh] = FileHandle(fh, p)
518 def read(self, fh, off, size):
519 _logger.debug("arv-mount read %i %i %i", fh, off, size)
520 if fh in self._filehandles:
521 handle = self._filehandles[fh]
523 raise llfuse.FUSEError(errno.EBADF)
526 with llfuse.lock_released:
527 return handle.entry.readfrom(off, size)
529 raise llfuse.FUSEError(errno.EIO)
531 def release(self, fh):
532 if fh in self._filehandles:
533 del self._filehandles[fh]
535 def opendir(self, inode):
536 _logger.debug("arv-mount opendir: inode %i", inode)
538 if inode in self.inodes:
539 p = self.inodes[inode]
541 raise llfuse.FUSEError(errno.ENOENT)
543 if not isinstance(p, Directory):
544 raise llfuse.FUSEError(errno.ENOTDIR)
546 fh = self._filehandles_counter
547 self._filehandles_counter += 1
548 if p.parent_inode in self.inodes:
549 parent = self.inodes[p.parent_inode]
551 raise llfuse.FUSEError(errno.EIO)
553 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
556 def readdir(self, fh, off):
557 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
559 if fh in self._filehandles:
560 handle = self._filehandles[fh]
562 raise llfuse.FUSEError(errno.EBADF)
564 _logger.debug("arv-mount handle.entry %s", handle.entry)
567 while e < len(handle.entry):
568 if handle.entry[e][1].inode in self.inodes:
569 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
572 def releasedir(self, fh):
573 del self._filehandles[fh]
576 st = llfuse.StatvfsData()
577 st.f_bsize = 1024 * 1024
590 # The llfuse documentation recommends only overloading functions that
591 # are actually implemented, as the default implementation will raise ENOSYS.
592 # However, there is a bug in the llfuse default implementation of create()
593 # "create() takes exactly 5 positional arguments (6 given)" which will crash
595 # The workaround is to implement it with the proper number of parameters,
596 # and then everything works out.
597 def create(self, p1, p2, p3, p4, p5):
598 raise llfuse.FUSEError(errno.EROFS)