2 # FUSE driver for Arvados Keep
17 from llfuse import FUSEError
19 class Directory(object):
20 '''Generic directory object, backed by a dict.
21 Consists of a set of entries with the key representing the filename
22 and the value referencing a File or Directory object.
25 def __init__(self, parent_inode):
27 self.parent_inode = parent_inode
30 def __getitem__(self, item):
31 return self._entries[item]
33 def __setitem__(self, key, item):
34 self._entries[key] = item
37 return self._entries.iterkeys()
40 return self._entries.items()
42 def __contains__(self, k):
43 return k in self._entries
48 class MagicDirectory(Directory):
49 '''A special directory that logically contains the set of all extant
50 keep locators. When a file is referenced by lookup(), it is tested
51 to see if it is a valid keep locator to a manifest, and if so, loads the manifest
52 contents as a subdirectory of this directory with the locator as the directory name.
53 Since querying a list of all extant keep locators is impractical, only loaded collections
54 are visible to readdir().'''
56 def __init__(self, parent_inode, inodes):
57 super(MagicDirectory, self).__init__(parent_inode)
60 def __contains__(self, k):
61 if k in self._entries:
64 if arvados.Keep.get(k):
68 except Exception as e:
69 #print 'exception keep', e
72 def __getitem__(self, item):
73 if item not in self._entries:
74 collection = arvados.CollectionReader(arvados.Keep.get(item))
75 self._entries[item] = self.inodes.add_entry(Directory(self.inode))
76 self.inodes.load_collection(self._entries[item], collection)
77 return self._entries[item]
80 '''Wraps a StreamFileReader for use by Directory.'''
82 def __init__(self, parent_inode, reader):
84 self.parent_inode = parent_inode
88 return self.reader.size()
90 class FileHandle(object):
91 '''Connects a numeric file handle to a File or Directory object that has
92 been opened by the client.'''
94 def __init__(self, fh, entry):
99 '''Manage the set of inodes. This is the mapping from a numeric id
100 to a concrete File or Directory object'''
104 self._counter = llfuse.ROOT_INODE
106 def __getitem__(self, item):
107 return self._entries[item]
109 def __setitem__(self, key, item):
110 self._entries[key] = item
113 return self._entries.iterkeys()
116 return self._entries.items()
118 def __contains__(self, k):
119 return k in self._entries
121 def load_collection(self, parent_dir, collection):
122 '''parent_dir is the Directory object that will be populated by the collection.
123 collection is the arvados.CollectionReader to use as the source'''
124 for s in collection.all_streams():
126 for part in s.name().split('/'):
127 if part != '' and part != '.':
129 cwd[part] = self.add_entry(Directory(cwd.inode))
131 for k, v in s.files().items():
132 cwd[k] = self.add_entry(File(cwd.inode, v))
134 def add_entry(self, entry):
135 entry.inode = self._counter
136 self._entries[entry.inode] = entry
140 class Operations(llfuse.Operations):
141 '''This is the main interface with llfuse. The methods on this object are
142 called by llfuse threads to service FUSE events to query and read from
145 llfuse has its own global lock which is acquired before calling a request handler,
146 so request handlers do not run concurrently unless the lock is explicitly released
147 with llfuse.lock_released.'''
149 def __init__(self, uid, gid):
150 super(Operations, self).__init__()
152 self.inodes = Inodes()
156 # dict of inode to filehandle
157 self._filehandles = {}
158 self._filehandles_counter = 1
160 # Other threads that need to wait until the fuse driver
161 # is fully initialized should wait() on this event object.
162 self.initlock = threading.Event()
165 # Allow threads that are waiting for the driver to be finished
166 # initializing to continue
169 def access(self, inode, mode, ctx):
172 def getattr(self, inode):
173 e = self.inodes[inode]
175 entry = llfuse.EntryAttributes()
178 entry.entry_timeout = 300
179 entry.attr_timeout = 300
181 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
182 if isinstance(e, Directory):
183 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
185 entry.st_mode |= stat.S_IFREG
188 entry.st_uid = self.uid
189 entry.st_gid = self.gid
192 entry.st_size = e.size()
194 entry.st_blksize = 1024
195 entry.st_blocks = e.size()/1024
196 if e.size()/1024 != 0:
204 def lookup(self, parent_inode, name):
205 #print "lookup: parent_inode", parent_inode, "name", name
211 if parent_inode in self.inodes:
212 p = self.inodes[parent_inode]
214 inode = p.parent_inode
216 inode = p[name].inode
219 return self.getattr(inode)
221 raise llfuse.FUSEError(errno.ENOENT)
223 def open(self, inode, flags):
224 if inode in self.inodes:
225 p = self.inodes[inode]
227 raise llfuse.FUSEError(errno.ENOENT)
229 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
230 raise llfuse.FUSEError(errno.EROFS)
232 if isinstance(p, Directory):
233 raise llfuse.FUSEError(errno.EISDIR)
235 fh = self._filehandles_counter
236 self._filehandles_counter += 1
237 self._filehandles[fh] = FileHandle(fh, p)
240 def read(self, fh, off, size):
241 #print "read", fh, off, size
242 if fh in self._filehandles:
243 handle = self._filehandles[fh]
245 raise llfuse.FUSEError(errno.EBADF)
248 with llfuse.lock_released:
249 return handle.entry.reader.readfrom(off, size)
251 raise llfuse.FUSEError(errno.EIO)
253 def release(self, fh):
254 if fh in self._filehandles:
255 del self._filehandles[fh]
257 def opendir(self, inode):
258 #print "opendir: inode", inode
260 if inode in self.inodes:
261 p = self.inodes[inode]
263 raise llfuse.FUSEError(errno.ENOENT)
265 if not isinstance(p, Directory):
266 raise llfuse.FUSEError(errno.ENOTDIR)
268 fh = self._filehandles_counter
269 self._filehandles_counter += 1
270 if p.parent_inode in self.inodes:
271 parent = self.inodes[p.parent_inode]
274 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
277 def readdir(self, fh, off):
278 #print "readdir: fh", fh, "off", off
280 if fh in self._filehandles:
281 handle = self._filehandles[fh]
283 raise llfuse.FUSEError(errno.EBADF)
285 #print "handle.entry", handle.entry
288 while e < len(handle.entry):
289 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
292 def releasedir(self, fh):
293 del self._filehandles[fh]
296 st = llfuse.StatvfsData()
297 st.f_bsize = 1024 * 1024
310 # The llfuse documentation recommends only overloading functions that
311 # are actually implemented, as the default implementation will raise ENOSYS.
312 # However, there is a bug in the llfuse default implementation of create()
313 # "create() takes exactly 5 positional arguments (6 given)" which will crash
315 # The workaround is to implement it with the proper number of parameters,
316 # and then everything works out.
317 def create(self, p1, p2, p3, p4, p5):
318 raise llfuse.FUSEError(errno.EROFS)