2 # FUSE driver for Arvados Keep
17 from llfuse import FUSEError
19 class Directory(object):
20 '''Generic directory object, backed by a dict.
21 Consists of a set of entries with the key representing the filename
22 and the value referencing a File or Directory object.
25 def __init__(self, parent_inode):
26 '''parent_inode is the integer inode number'''
28 if not isinstance(parent_inode, int):
29 raise Exception("parent_inode should be an int")
30 self.parent_inode = parent_inode
34 self._last_update = time()
37 # Overriden by subclasses to implement logic to update the entries dict
38 # when the directory is stale
42 # Mark the entries dict as stale
46 # Test if the entries dict is stale
51 return (self._last_update + self._poll_time) < time()
56 self._last_update = time()
58 # Only used when computing the size of the disk footprint of the directory
63 def __getitem__(self, item):
66 return self._entries[item]
71 return self._entries.items()
76 return self._entries.iterkeys()
78 def __contains__(self, k):
81 return k in self._entries
84 class CollectionDirectory(Directory):
85 '''Represents the root of a directory tree holding a collection.'''
87 def __init__(self, parent_inode, inodes, collection_locator):
88 super(CollectionDirectory, self).__init__(parent_inode)
90 self.collection_locator = collection_locator
93 collection = arvados.CollectionReader(arvados.Keep.get(self.collection_locator))
94 for s in collection.all_streams():
96 for part in s.name().split('/'):
97 if part != '' and part != '.':
98 if part not in cwd._entries:
99 cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
100 cwd = cwd._entries[part]
101 for k, v in s.files().items():
102 cwd._entries[k] = self.inodes.add_entry(File(cwd.inode, v))
105 class MagicDirectory(Directory):
106 '''A special directory that logically contains the set of all extant keep
107 locators. When a file is referenced by lookup(), it is tested to see if it
108 is a valid keep locator to a manifest, and if so, loads the manifest
109 contents as a subdirectory of this directory with the locator as the
110 directory name. Since querying a list of all extant keep locators is
111 impractical, only collections that have already been accessed are visible
115 def __init__(self, parent_inode, inodes):
116 super(MagicDirectory, self).__init__(parent_inode)
119 def __contains__(self, k):
120 if k in self._entries:
123 if arvados.Keep.get(k):
127 except Exception as e:
128 #print 'exception keep', e
131 def __getitem__(self, item):
132 if item not in self._entries:
133 self._entries[item] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, item))
134 return self._entries[item]
137 class TagsDirectory(Directory):
138 '''A special directory that contains as subdirectories all tags visible to the user.'''
140 def __init__(self, parent_inode, inodes, api, poll_time=60):
141 super(TagsDirectory, self).__init__(parent_inode)
145 arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
148 self._poll_time = poll_time
150 def invalidate(self):
152 super(TagsDirectory, self).invalidate()
153 for a in self._entries:
154 self._entries[a].invalidate()
157 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = 'name').execute()
158 oldentries = self._entries
160 for n in tags['items']:
163 self._entries[n] = oldentries[n]
165 self._entries[n] = self.inodes.add_entry(TagDirectory(self.inode, self.inodes, self.api, n, poll=self._poll, poll_time=self._poll_time))
169 class TagDirectory(Directory):
170 '''A special directory that contains as subdirectories all collections visible
171 to the user that are tagged with a particular tag.
174 def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
175 super(TagDirectory, self).__init__(parent_inode)
180 self._poll_time = poll_time
183 collections = self.api.links().list(filters=[['link_class', '=', 'tag'],
184 ['name', '=', self.tag],
185 ['head_uuid', 'is_a', 'arvados#collection']],
186 select=['head_uuid']).execute()
187 oldentries = self._entries
189 for c in collections['items']:
192 self._entries[n] = oldentries[n]
194 self._entries[n] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, n))
198 '''Wraps a StreamFileReader for use by Directory.'''
200 def __init__(self, parent_inode, reader):
202 self.parent_inode = parent_inode
206 return self.reader.size()
209 class FileHandle(object):
210 '''Connects a numeric file handle to a File or Directory object that has
211 been opened by the client.'''
213 def __init__(self, fh, entry):
218 class Inodes(object):
219 '''Manage the set of inodes. This is the mapping from a numeric id
220 to a concrete File or Directory object'''
224 self._counter = llfuse.ROOT_INODE
226 def __getitem__(self, item):
227 return self._entries[item]
229 def __setitem__(self, key, item):
230 self._entries[key] = item
233 return self._entries.iterkeys()
236 return self._entries.items()
238 def __contains__(self, k):
239 return k in self._entries
241 def add_entry(self, entry):
242 entry.inode = self._counter
243 self._entries[entry.inode] = entry
247 class Operations(llfuse.Operations):
248 '''This is the main interface with llfuse. The methods on this object are
249 called by llfuse threads to service FUSE events to query and read from
252 llfuse has its own global lock which is acquired before calling a request handler,
253 so request handlers do not run concurrently unless the lock is explicitly released
254 with llfuse.lock_released.'''
256 def __init__(self, uid, gid):
257 super(Operations, self).__init__()
259 self.inodes = Inodes()
263 # dict of inode to filehandle
264 self._filehandles = {}
265 self._filehandles_counter = 1
267 # Other threads that need to wait until the fuse driver
268 # is fully initialized should wait() on this event object.
269 self.initlock = threading.Event()
272 # Allow threads that are waiting for the driver to be finished
273 # initializing to continue
276 def access(self, inode, mode, ctx):
279 def getattr(self, inode):
280 e = self.inodes[inode]
282 entry = llfuse.EntryAttributes()
285 entry.entry_timeout = 300
286 entry.attr_timeout = 300
288 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
289 if isinstance(e, Directory):
290 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
292 entry.st_mode |= stat.S_IFREG
295 entry.st_uid = self.uid
296 entry.st_gid = self.gid
299 entry.st_size = e.size()
301 entry.st_blksize = 1024
302 entry.st_blocks = e.size()/1024
303 if e.size()/1024 != 0:
311 def lookup(self, parent_inode, name):
312 #print "lookup: parent_inode", parent_inode, "name", name
318 if parent_inode in self.inodes:
319 p = self.inodes[parent_inode]
321 inode = p.parent_inode
323 inode = p[name].inode
326 return self.getattr(inode)
328 raise llfuse.FUSEError(errno.ENOENT)
330 def open(self, inode, flags):
331 if inode in self.inodes:
332 p = self.inodes[inode]
334 raise llfuse.FUSEError(errno.ENOENT)
336 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
337 raise llfuse.FUSEError(errno.EROFS)
339 if isinstance(p, Directory):
340 raise llfuse.FUSEError(errno.EISDIR)
342 fh = self._filehandles_counter
343 self._filehandles_counter += 1
344 self._filehandles[fh] = FileHandle(fh, p)
347 def read(self, fh, off, size):
348 #print "read", fh, off, size
349 if fh in self._filehandles:
350 handle = self._filehandles[fh]
352 raise llfuse.FUSEError(errno.EBADF)
355 with llfuse.lock_released:
356 return handle.entry.reader.readfrom(off, size)
358 raise llfuse.FUSEError(errno.EIO)
360 def release(self, fh):
361 if fh in self._filehandles:
362 del self._filehandles[fh]
364 def opendir(self, inode):
365 #print "opendir: inode", inode
367 if inode in self.inodes:
368 p = self.inodes[inode]
370 raise llfuse.FUSEError(errno.ENOENT)
372 if not isinstance(p, Directory):
373 raise llfuse.FUSEError(errno.ENOTDIR)
375 fh = self._filehandles_counter
376 self._filehandles_counter += 1
377 if p.parent_inode in self.inodes:
378 parent = self.inodes[p.parent_inode]
380 raise llfuse.FUSEError(errno.EIO)
382 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
385 def readdir(self, fh, off):
386 #print "readdir: fh", fh, "off", off
388 if fh in self._filehandles:
389 handle = self._filehandles[fh]
391 raise llfuse.FUSEError(errno.EBADF)
393 #print "handle.entry", handle.entry
396 while e < len(handle.entry):
397 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
400 def releasedir(self, fh):
401 del self._filehandles[fh]
404 st = llfuse.StatvfsData()
405 st.f_bsize = 1024 * 1024
418 # The llfuse documentation recommends only overloading functions that
419 # are actually implemented, as the default implementation will raise ENOSYS.
420 # However, there is a bug in the llfuse default implementation of create()
421 # "create() takes exactly 5 positional arguments (6 given)" which will crash
423 # The workaround is to implement it with the proper number of parameters,
424 # and then everything works out.
425 def create(self, p1, p2, p3, p4, p5):
426 raise llfuse.FUSEError(errno.EROFS)