2 # FUSE driver for Arvados Keep
16 from llfuse import FUSEError
18 class Directory(object):
19 '''Generic directory object, backed by a dict.
20 Consists of a set of entries with the key representing the filename
21 and the value referencing a File or Directory object.
24 def __init__(self, parent_inode):
26 self.parent_inode = parent_inode
30 # Overriden by subclasses to implement logic to update the entries dict
31 # when the directory is stale
35 # Mark the entries dict as stale
39 # Only used when computing the size of the disk footprint of the directory
44 def __getitem__(self, item):
47 return self._entries[item]
52 return self._entries.items()
57 return self._entries.iterkeys()
59 def __contains__(self, k):
62 return k in self._entries
65 class CollectionDirectory(Directory):
66 '''Represents the root of a directory tree holding a collection.'''
68 def __init__(self, parent_inode, inodes, collection_locator):
69 super(CollectionDirectory, self).__init__(parent_inode)
71 self.collection_locator = collection_locator
74 collection = arvados.CollectionReader(arvados.Keep.get(self.collection_locator))
75 for s in collection.all_streams():
77 for part in s.name().split('/'):
78 if part != '' and part != '.':
79 if part not in cwd._entries:
80 cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
81 cwd = cwd._entries[part]
82 for k, v in s.files().items():
83 cwd._entries[k] = self.inodes.add_entry(File(cwd.inode, v))
87 class MagicDirectory(Directory):
88 '''A special directory that logically contains the set of all extant keep
89 locators. When a file is referenced by lookup(), it is tested to see if it
90 is a valid keep locator to a manifest, and if so, loads the manifest
91 contents as a subdirectory of this directory with the locator as the
92 directory name. Since querying a list of all extant keep locators is
93 impractical, only collections that have already been accessed are visible
97 def __init__(self, parent_inode, inodes):
98 super(MagicDirectory, self).__init__(parent_inode)
101 def __contains__(self, k):
102 if k in self._entries:
105 if arvados.Keep.get(k):
109 except Exception as e:
110 #print 'exception keep', e
113 def __getitem__(self, item):
114 if item not in self._entries:
115 self._entries[item] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, item))
116 return self._entries[item]
119 class TagsDirectory(Directory):
120 '''A special directory that contains as subdirectories all tags visible to the user.'''
122 def __init__(self, parent_inode, inodes, api):
123 super(TagsDirectory, self).__init__(parent_inode)
128 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = 'name').execute()
129 oldentries = self._entries
131 for n in tags['items']:
133 self._entries[n] = oldentries[n]
135 self._entries[n] = self.inodes.add_entry(TagDirectory(self, inodes, api, n))
139 class TagDirectory(Directory):
140 '''A special directory that contains as subdirectories all collections visible
141 to the user that are tagged with a particular tag.
144 def __init__(self, parent_inode, inodes, api, tag):
145 super(TagDirectory, self).__init__(parent_inode)
151 collections = self.api.links().list(filters=[['link_class', '=', 'tag'],
152 ['name', '=', self.tag],
153 ['head_uuid', 'is_a', 'arvados#collection']],
154 select=['head_uuid']).execute()
155 oldentries = self._entries
157 for c in collections['items']:
159 self._entries[n] = oldentries[n]
161 self._entries[n] = self.inodes.add_entry(CollectionDirectory(self, inodes, api, n['head_uuid']))
166 '''Wraps a StreamFileReader for use by Directory.'''
168 def __init__(self, parent_inode, reader):
170 self.parent_inode = parent_inode
174 return self.reader.size()
177 class FileHandle(object):
178 '''Connects a numeric file handle to a File or Directory object that has
179 been opened by the client.'''
181 def __init__(self, fh, entry):
186 class Inodes(object):
187 '''Manage the set of inodes. This is the mapping from a numeric id
188 to a concrete File or Directory object'''
192 self._counter = llfuse.ROOT_INODE
194 def __getitem__(self, item):
195 return self._entries[item]
197 def __setitem__(self, key, item):
198 self._entries[key] = item
201 return self._entries.iterkeys()
204 return self._entries.items()
206 def __contains__(self, k):
207 return k in self._entries
209 def add_entry(self, entry):
210 entry.inode = self._counter
211 self._entries[entry.inode] = entry
215 class Operations(llfuse.Operations):
216 '''This is the main interface with llfuse. The methods on this object are
217 called by llfuse threads to service FUSE events to query and read from
220 llfuse has its own global lock which is acquired before calling a request handler,
221 so request handlers do not run concurrently unless the lock is explicitly released
222 with llfuse.lock_released.'''
224 def __init__(self, uid, gid):
225 super(Operations, self).__init__()
227 self.inodes = Inodes()
231 # dict of inode to filehandle
232 self._filehandles = {}
233 self._filehandles_counter = 1
235 # Other threads that need to wait until the fuse driver
236 # is fully initialized should wait() on this event object.
237 self.initlock = threading.Event()
240 # Allow threads that are waiting for the driver to be finished
241 # initializing to continue
244 def access(self, inode, mode, ctx):
247 def getattr(self, inode):
248 e = self.inodes[inode]
250 entry = llfuse.EntryAttributes()
253 entry.entry_timeout = 300
254 entry.attr_timeout = 300
256 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
257 if isinstance(e, Directory):
258 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
260 entry.st_mode |= stat.S_IFREG
263 entry.st_uid = self.uid
264 entry.st_gid = self.gid
267 entry.st_size = e.size()
269 entry.st_blksize = 1024
270 entry.st_blocks = e.size()/1024
271 if e.size()/1024 != 0:
279 def lookup(self, parent_inode, name):
280 #print "lookup: parent_inode", parent_inode, "name", name
286 if parent_inode in self.inodes:
287 p = self.inodes[parent_inode]
289 inode = p.parent_inode
291 inode = p[name].inode
294 return self.getattr(inode)
296 raise llfuse.FUSEError(errno.ENOENT)
298 def open(self, inode, flags):
299 if inode in self.inodes:
300 p = self.inodes[inode]
302 raise llfuse.FUSEError(errno.ENOENT)
304 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
305 raise llfuse.FUSEError(errno.EROFS)
307 if isinstance(p, Directory):
308 raise llfuse.FUSEError(errno.EISDIR)
310 fh = self._filehandles_counter
311 self._filehandles_counter += 1
312 self._filehandles[fh] = FileHandle(fh, p)
315 def read(self, fh, off, size):
316 #print "read", fh, off, size
317 if fh in self._filehandles:
318 handle = self._filehandles[fh]
320 raise llfuse.FUSEError(errno.EBADF)
323 with llfuse.lock_released:
324 return handle.entry.reader.readfrom(off, size)
326 raise llfuse.FUSEError(errno.EIO)
328 def release(self, fh):
329 if fh in self._filehandles:
330 del self._filehandles[fh]
332 def opendir(self, inode):
333 #print "opendir: inode", inode
335 if inode in self.inodes:
336 p = self.inodes[inode]
338 raise llfuse.FUSEError(errno.ENOENT)
340 if not isinstance(p, Directory):
341 raise llfuse.FUSEError(errno.ENOTDIR)
343 fh = self._filehandles_counter
344 self._filehandles_counter += 1
345 if p.parent_inode in self.inodes:
346 parent = self.inodes[p.parent_inode]
349 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
352 def readdir(self, fh, off):
353 #print "readdir: fh", fh, "off", off
355 if fh in self._filehandles:
356 handle = self._filehandles[fh]
358 raise llfuse.FUSEError(errno.EBADF)
360 #print "handle.entry", handle.entry
363 while e < len(handle.entry):
364 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
367 def releasedir(self, fh):
368 del self._filehandles[fh]
371 st = llfuse.StatvfsData()
372 st.f_bsize = 1024 * 1024
385 # The llfuse documentation recommends only overloading functions that
386 # are actually implemented, as the default implementation will raise ENOSYS.
387 # However, there is a bug in the llfuse default implementation of create()
388 # "create() takes exactly 5 positional arguments (6 given)" which will crash
390 # The workaround is to implement it with the proper number of parameters,
391 # and then everything works out.
392 def create(self, p1, p2, p3, p4, p5):
393 raise llfuse.FUSEError(errno.EROFS)