2 # FUSE driver for Arvados Keep
16 from llfuse import FUSEError
18 class Directory(object):
19 '''Generic directory object, backed by a dict.
20 Consists of a set of entries with the key representing the filename
21 and the value referencing a File or Directory object.
24 def __init__(self, parent_inode):
26 self.parent_inode = parent_inode
29 def __getitem__(self, item):
30 return self._entries[item]
32 def __setitem__(self, key, item):
33 self._entries[key] = item
36 return self._entries.iterkeys()
39 return self._entries.items()
41 def __contains__(self, k):
42 return k in self._entries
47 class MagicDirectory(Directory):
48 '''A special directory that logically contains the set of all extant
49 keep locators. When a file is referenced by lookup(), it is tested
50 to see if it is a valid keep locator to a manifest, and if so, loads the manifest
51 contents as a subdirectory of this directory with the locator as the directory name.
52 Since querying a list of all extant keep locators is impractical, only loaded collections
53 are visible to readdir().'''
55 def __init__(self, parent_inode, inodes):
56 super(MagicDirectory, self).__init__(parent_inode)
59 def __contains__(self, k):
60 if k in self._entries:
63 if arvados.Keep.get(k):
67 except Exception as e:
68 #print 'exception keep', e
71 def __getitem__(self, item):
72 if item not in self._entries:
73 collection = arvados.CollectionReader(arvados.Keep.get(item))
74 self._entries[item] = self.inodes.add_entry(Directory(self.inode))
75 self.inodes.load_collection(self._entries[item], collection)
76 return self._entries[item]
79 '''Wraps a StreamFileReader for use by Directory.'''
81 def __init__(self, parent_inode, reader):
83 self.parent_inode = parent_inode
87 return self.reader.size()
89 class FileHandle(object):
90 '''Connects a numeric file handle to a File or Directory object that has
91 been opened by the client.'''
93 def __init__(self, fh, entry):
98 '''Manage the set of inodes. This is the mapping from a numeric id
99 to a concrete File or Directory object'''
103 self._counter = llfuse.ROOT_INODE
105 def __getitem__(self, item):
106 return self._entries[item]
108 def __setitem__(self, key, item):
109 self._entries[key] = item
112 return self._entries.iterkeys()
115 return self._entries.items()
117 def __contains__(self, k):
118 return k in self._entries
120 def load_collection(self, parent_dir, collection):
121 '''parent_dir is the Directory object that will be populated by the collection.
122 collection is the arvados.CollectionReader to use as the source'''
123 for s in collection.all_streams():
125 for part in s.name().split('/'):
126 if part != '' and part != '.':
128 cwd[part] = self.add_entry(Directory(cwd.inode))
130 for k, v in s.files().items():
131 cwd[k] = self.add_entry(File(cwd.inode, v))
133 def add_entry(self, entry):
134 entry.inode = self._counter
135 self._entries[entry.inode] = entry
139 class Operations(llfuse.Operations):
140 '''This is the main interface with llfuse. The methods on this object are
141 called by llfuse threads to service FUSE events to query and read from
144 llfuse has its own global lock which is acquired before calling a request handler,
145 so request handlers do not run concurrently unless the lock is explicitly released
146 with llfuse.lock_released.'''
148 def __init__(self, uid, gid):
149 super(Operations, self).__init__()
151 self.inodes = Inodes()
155 # dict of inode to filehandle
156 self._filehandles = {}
157 self._filehandles_counter = 1
159 # Other threads that need to wait until the fuse driver
160 # is fully initialized should wait() on this event object.
161 self.initlock = threading.Event()
164 # Allow threads that are waiting for the driver to be finished
165 # initializing to continue
168 def access(self, inode, mode, ctx):
171 def getattr(self, inode):
172 e = self.inodes[inode]
174 entry = llfuse.EntryAttributes()
177 entry.entry_timeout = 300
178 entry.attr_timeout = 300
180 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
181 if isinstance(e, Directory):
182 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
184 entry.st_mode |= stat.S_IFREG
187 entry.st_uid = self.uid
188 entry.st_gid = self.gid
191 entry.st_size = e.size()
193 entry.st_blksize = 1024
194 entry.st_blocks = e.size()/1024
195 if e.size()/1024 != 0:
203 def lookup(self, parent_inode, name):
204 #print "lookup: parent_inode", parent_inode, "name", name
210 if parent_inode in self.inodes:
211 p = self.inodes[parent_inode]
213 inode = p.parent_inode
215 inode = p[name].inode
218 return self.getattr(inode)
220 raise llfuse.FUSEError(errno.ENOENT)
222 def open(self, inode, flags):
223 if inode in self.inodes:
224 p = self.inodes[inode]
226 raise llfuse.FUSEError(errno.ENOENT)
228 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
229 raise llfuse.FUSEError(errno.EROFS)
231 if isinstance(p, Directory):
232 raise llfuse.FUSEError(errno.EISDIR)
234 fh = self._filehandles_counter
235 self._filehandles_counter += 1
236 self._filehandles[fh] = FileHandle(fh, p)
239 def read(self, fh, off, size):
240 #print "read", fh, off, size
241 if fh in self._filehandles:
242 handle = self._filehandles[fh]
244 raise llfuse.FUSEError(errno.EBADF)
247 with llfuse.lock_released:
248 return handle.entry.reader.readfrom(off, size)
250 raise llfuse.FUSEError(errno.EIO)
252 def release(self, fh):
253 if fh in self._filehandles:
254 del self._filehandles[fh]
256 def opendir(self, inode):
257 #print "opendir: inode", inode
259 if inode in self.inodes:
260 p = self.inodes[inode]
262 raise llfuse.FUSEError(errno.ENOENT)
264 if not isinstance(p, Directory):
265 raise llfuse.FUSEError(errno.ENOTDIR)
267 fh = self._filehandles_counter
268 self._filehandles_counter += 1
269 if p.parent_inode in self.inodes:
270 parent = self.inodes[p.parent_inode]
273 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
276 def readdir(self, fh, off):
277 #print "readdir: fh", fh, "off", off
279 if fh in self._filehandles:
280 handle = self._filehandles[fh]
282 raise llfuse.FUSEError(errno.EBADF)
284 #print "handle.entry", handle.entry
287 while e < len(handle.entry):
288 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
291 def releasedir(self, fh):
292 del self._filehandles[fh]
295 st = llfuse.StatvfsData()
296 st.f_bsize = 1024 * 1024
309 # The llfuse documentation recommends only overloading functions that
310 # are actually implemented, as the default implementation will raise ENOSYS.
311 # However, there is a bug in the llfuse default implementation of create()
312 # "create() takes exactly 5 positional arguments (6 given)" which will crash
314 # The workaround is to implement it with the proper number of parameters,
315 # and then everything works out.
316 def create(self, p1, p2, p3, p4, p5):
317 raise llfuse.FUSEError(errno.EROFS)