]> git.arvados.org - arvados.git/blob - sdk/python/arvados/fuse.py
Refactored directory handling a bit in fuse to better accomodate other virtual
[arvados.git] / sdk / python / arvados / fuse.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7
8 import llfuse
9 import errno
10 import stat
11 import threading
12 import arvados
13 import pprint
14
15 from time import time
16 from llfuse import FUSEError
17
18 class Directory(object):
19     '''Generic directory object, backed by a dict.
20     Consists of a set of entries with the key representing the filename
21     and the value referencing a File or Directory object.
22     '''
23
24     def __init__(self, parent_inode):
25         self.inode = None
26         self.parent_inode = parent_inode
27         self._entries = {}
28         self.stale = True
29
30     #  Overriden by subclasses to implement logic to update the entries dict
31     #  when the directory is stale
32     def update(self):
33         pass
34
35     # Mark the entries dict as stale
36     def invalidate(self):
37         self.stale = True
38
39     # Only used when computing the size of the disk footprint of the directory
40     # (stub)
41     def size(self):
42         return 0
43
44     def __getitem__(self, item):
45         if self.stale:
46             self.update()
47         return self._entries[item]
48
49     def items(self):
50         if self.stale:
51             self.update()
52         return self._entries.items()
53
54     def __iter__(self):
55         if self.stale:
56             self.update()
57         return self._entries.iterkeys()
58
59     def __contains__(self, k):
60         if self.stale:
61             self.update()
62         return k in self._entries
63
64
65 class CollectionDirectory(Directory):
66     '''Represents the root of a directory tree holding a collection.'''
67
68     def __init__(self, parent_inode, inodes, collection_locator):
69         super(CollectionDirectory, self).__init__(parent_inode)
70         self.inodes = inodes
71         self.collection_locator = collection_locator
72
73     def update(self):
74         collection = arvados.CollectionReader(arvados.Keep.get(self.collection_locator))
75         for s in collection.all_streams():
76             cwd = self
77             for part in s.name().split('/'):
78                 if part != '' and part != '.':
79                     if part not in cwd._entries:
80                         cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
81                     cwd = cwd._entries[part]
82             for k, v in s.files().items():
83                 cwd._entries[k] = self.inodes.add_entry(File(cwd.inode, v))
84         self.stale = False
85
86
87 class MagicDirectory(Directory):
88     '''A special directory that logically contains the set of all extant keep
89     locators.  When a file is referenced by lookup(), it is tested to see if it
90     is a valid keep locator to a manifest, and if so, loads the manifest
91     contents as a subdirectory of this directory with the locator as the
92     directory name.  Since querying a list of all extant keep locators is
93     impractical, only collections that have already been accessed are visible
94     to readdir().
95     '''
96
97     def __init__(self, parent_inode, inodes):
98         super(MagicDirectory, self).__init__(parent_inode)
99         self.inodes = inodes
100
101     def __contains__(self, k):
102         if k in self._entries:
103             return True
104         try:
105             if arvados.Keep.get(k):
106                 return True
107             else:
108                 return False
109         except Exception as e:
110             #print 'exception keep', e
111             return False
112
113     def __getitem__(self, item):
114         if item not in self._entries:
115             self._entries[item] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, item))
116         return self._entries[item]
117
118
119 class TagsDirectory(Directory):
120     '''A special directory that contains as subdirectories all tags visible to the user.'''
121
122     def __init__(self, parent_inode, inodes, api):
123         super(TagsDirectory, self).__init__(parent_inode)
124         self.inodes = inodes
125         self.api = api
126
127     def update(self):
128         tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = 'name').execute()
129         oldentries = self._entries
130         self._entries = {}
131         for n in tags['items']:
132             if n in oldentries:
133                 self._entries[n] = oldentries[n]
134             else:
135                 self._entries[n] = self.inodes.add_entry(TagDirectory(self, inodes, api, n))
136         self.stale = False
137
138
139 class TagDirectory(Directory):
140     '''A special directory that contains as subdirectories all collections visible
141     to the user that are tagged with a particular tag.
142     '''
143
144     def __init__(self, parent_inode, inodes, api, tag):
145         super(TagDirectory, self).__init__(parent_inode)
146         self.inodes = inodes
147         self.api = api
148         self.tag = tag
149
150     def update(self):
151         collections = self.api.links().list(filters=[['link_class', '=', 'tag'],
152                                                ['name', '=', self.tag],
153                                                ['head_uuid', 'is_a', 'arvados#collection']],
154                                       select=['head_uuid']).execute()
155         oldentries = self._entries
156         self._entries = {}
157         for c in collections['items']:
158             if n in oldentries:
159                 self._entries[n] = oldentries[n]
160             else:
161                 self._entries[n] = self.inodes.add_entry(CollectionDirectory(self, inodes, api, n['head_uuid']))
162         self.stale = False
163
164
165 class File(object):
166     '''Wraps a StreamFileReader for use by Directory.'''
167
168     def __init__(self, parent_inode, reader):
169         self.inode = None
170         self.parent_inode = parent_inode
171         self.reader = reader
172
173     def size(self):
174         return self.reader.size()
175
176
177 class FileHandle(object):
178     '''Connects a numeric file handle to a File or Directory object that has
179     been opened by the client.'''
180
181     def __init__(self, fh, entry):
182         self.fh = fh
183         self.entry = entry
184
185
186 class Inodes(object):
187     '''Manage the set of inodes.  This is the mapping from a numeric id
188     to a concrete File or Directory object'''
189
190     def __init__(self):
191         self._entries = {}
192         self._counter = llfuse.ROOT_INODE
193
194     def __getitem__(self, item):
195         return self._entries[item]
196
197     def __setitem__(self, key, item):
198         self._entries[key] = item
199
200     def __iter__(self):
201         return self._entries.iterkeys()
202
203     def items(self):
204         return self._entries.items()
205
206     def __contains__(self, k):
207         return k in self._entries
208
209     def add_entry(self, entry):
210         entry.inode = self._counter
211         self._entries[entry.inode] = entry
212         self._counter += 1
213         return entry
214
215 class Operations(llfuse.Operations):
216     '''This is the main interface with llfuse.  The methods on this object are
217     called by llfuse threads to service FUSE events to query and read from
218     the file system.
219
220     llfuse has its own global lock which is acquired before calling a request handler,
221     so request handlers do not run concurrently unless the lock is explicitly released
222     with llfuse.lock_released.'''
223
224     def __init__(self, uid, gid):
225         super(Operations, self).__init__()
226
227         self.inodes = Inodes()
228         self.uid = uid
229         self.gid = gid
230
231         # dict of inode to filehandle
232         self._filehandles = {}
233         self._filehandles_counter = 1
234
235         # Other threads that need to wait until the fuse driver
236         # is fully initialized should wait() on this event object.
237         self.initlock = threading.Event()
238
239     def init(self):
240         # Allow threads that are waiting for the driver to be finished
241         # initializing to continue
242         self.initlock.set()
243
244     def access(self, inode, mode, ctx):
245         return True
246
247     def getattr(self, inode):
248         e = self.inodes[inode]
249
250         entry = llfuse.EntryAttributes()
251         entry.st_ino = inode
252         entry.generation = 0
253         entry.entry_timeout = 300
254         entry.attr_timeout = 300
255
256         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
257         if isinstance(e, Directory):
258             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
259         else:
260             entry.st_mode |= stat.S_IFREG
261
262         entry.st_nlink = 1
263         entry.st_uid = self.uid
264         entry.st_gid = self.gid
265         entry.st_rdev = 0
266
267         entry.st_size = e.size()
268
269         entry.st_blksize = 1024
270         entry.st_blocks = e.size()/1024
271         if e.size()/1024 != 0:
272             entry.st_blocks += 1
273         entry.st_atime = 0
274         entry.st_mtime = 0
275         entry.st_ctime = 0
276
277         return entry
278
279     def lookup(self, parent_inode, name):
280         #print "lookup: parent_inode", parent_inode, "name", name
281         inode = None
282
283         if name == '.':
284             inode = parent_inode
285         else:
286             if parent_inode in self.inodes:
287                 p = self.inodes[parent_inode]
288                 if name == '..':
289                     inode = p.parent_inode
290                 elif name in p:
291                     inode = p[name].inode
292
293         if inode != None:
294             return self.getattr(inode)
295         else:
296             raise llfuse.FUSEError(errno.ENOENT)
297
298     def open(self, inode, flags):
299         if inode in self.inodes:
300             p = self.inodes[inode]
301         else:
302             raise llfuse.FUSEError(errno.ENOENT)
303
304         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
305             raise llfuse.FUSEError(errno.EROFS)
306
307         if isinstance(p, Directory):
308             raise llfuse.FUSEError(errno.EISDIR)
309
310         fh = self._filehandles_counter
311         self._filehandles_counter += 1
312         self._filehandles[fh] = FileHandle(fh, p)
313         return fh
314
315     def read(self, fh, off, size):
316         #print "read", fh, off, size
317         if fh in self._filehandles:
318             handle = self._filehandles[fh]
319         else:
320             raise llfuse.FUSEError(errno.EBADF)
321
322         try:
323             with llfuse.lock_released:
324                 return handle.entry.reader.readfrom(off, size)
325         except:
326             raise llfuse.FUSEError(errno.EIO)
327
328     def release(self, fh):
329         if fh in self._filehandles:
330             del self._filehandles[fh]
331
332     def opendir(self, inode):
333         #print "opendir: inode", inode
334
335         if inode in self.inodes:
336             p = self.inodes[inode]
337         else:
338             raise llfuse.FUSEError(errno.ENOENT)
339
340         if not isinstance(p, Directory):
341             raise llfuse.FUSEError(errno.ENOTDIR)
342
343         fh = self._filehandles_counter
344         self._filehandles_counter += 1
345         if p.parent_inode in self.inodes:
346             parent = self.inodes[p.parent_inode]
347         else:
348             parent = None
349         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
350         return fh
351
352     def readdir(self, fh, off):
353         #print "readdir: fh", fh, "off", off
354
355         if fh in self._filehandles:
356             handle = self._filehandles[fh]
357         else:
358             raise llfuse.FUSEError(errno.EBADF)
359
360         #print "handle.entry", handle.entry
361
362         e = off
363         while e < len(handle.entry):
364             yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
365             e += 1
366
367     def releasedir(self, fh):
368         del self._filehandles[fh]
369
370     def statfs(self):
371         st = llfuse.StatvfsData()
372         st.f_bsize = 1024 * 1024
373         st.f_blocks = 0
374         st.f_files = 0
375
376         st.f_bfree = 0
377         st.f_bavail = 0
378
379         st.f_ffree = 0
380         st.f_favail = 0
381
382         st.f_frsize = 0
383         return st
384
385     # The llfuse documentation recommends only overloading functions that
386     # are actually implemented, as the default implementation will raise ENOSYS.
387     # However, there is a bug in the llfuse default implementation of create()
388     # "create() takes exactly 5 positional arguments (6 given)" which will crash
389     # arv-mount.
390     # The workaround is to implement it with the proper number of parameters,
391     # and then everything works out.
392     def create(self, p1, p2, p3, p4, p5):
393         raise llfuse.FUSEError(errno.EROFS)