cc35d584bd5332e2cda93bca76384ddbd35fd3d6
[arvados.git] / sdk / python / arvados / fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7
8 import llfuse
9 import errno
10 import stat
11 import threading
12 import arvados
13 import pprint
14 import arvados.events
15
16 from time import time
17 from llfuse import FUSEError
18
19 class Directory(object):
20     '''Generic directory object, backed by a dict.
21     Consists of a set of entries with the key representing the filename
22     and the value referencing a File or Directory object.
23     '''
24
25     def __init__(self, parent_inode):
26         '''parent_inode is the integer inode number'''
27         self.inode = None
28         if not isinstance(parent_inode, int):
29             raise Exception("parent_inode should be an int")
30         self.parent_inode = parent_inode
31         self._entries = {}
32         self._stale = True
33         self._poll = False
34         self._last_update = time()
35         self._poll_time = 60
36
37     #  Overriden by subclasses to implement logic to update the entries dict
38     #  when the directory is stale
39     def update(self):
40         pass
41
42     # Mark the entries dict as stale
43     def invalidate(self):
44         self._stale = True
45
46     # Test if the entries dict is stale
47     def stale(self):
48         if self._stale:
49             return True
50         if self._poll:
51             return (self._last_update + self._poll_time) < time()
52         return False
53
54     def fresh(self):
55         self._stale = False
56         self._last_update = time()
57
58     # Only used when computing the size of the disk footprint of the directory
59     # (stub)
60     def size(self):
61         return 0
62
63     def __getitem__(self, item):
64         if self.stale():
65             self.update()
66         return self._entries[item]
67
68     def items(self):
69         if self.stale():
70             self.update()
71         return self._entries.items()
72
73     def __iter__(self):
74         if self.stale():
75             self.update()
76         return self._entries.iterkeys()
77
78     def __contains__(self, k):
79         if self.stale():
80             self.update()
81         return k in self._entries
82
83
84 class CollectionDirectory(Directory):
85     '''Represents the root of a directory tree holding a collection.'''
86
87     def __init__(self, parent_inode, inodes, collection_locator):
88         super(CollectionDirectory, self).__init__(parent_inode)
89         self.inodes = inodes
90         self.collection_locator = collection_locator
91
92     def update(self):
93         collection = arvados.CollectionReader(arvados.Keep.get(self.collection_locator))
94         for s in collection.all_streams():
95             cwd = self
96             for part in s.name().split('/'):
97                 if part != '' and part != '.':
98                     if part not in cwd._entries:
99                         cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
100                     cwd = cwd._entries[part]
101             for k, v in s.files().items():
102                 cwd._entries[k] = self.inodes.add_entry(File(cwd.inode, v))
103         self.fresh()
104
105 class MagicDirectory(Directory):
106     '''A special directory that logically contains the set of all extant keep
107     locators.  When a file is referenced by lookup(), it is tested to see if it
108     is a valid keep locator to a manifest, and if so, loads the manifest
109     contents as a subdirectory of this directory with the locator as the
110     directory name.  Since querying a list of all extant keep locators is
111     impractical, only collections that have already been accessed are visible
112     to readdir().
113     '''
114
115     def __init__(self, parent_inode, inodes):
116         super(MagicDirectory, self).__init__(parent_inode)
117         self.inodes = inodes
118
119     def __contains__(self, k):
120         if k in self._entries:
121             return True
122         try:
123             if arvados.Keep.get(k):
124                 return True
125             else:
126                 return False
127         except Exception as e:
128             #print 'exception keep', e
129             return False
130
131     def __getitem__(self, item):
132         if item not in self._entries:
133             self._entries[item] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, item))
134         return self._entries[item]
135
136
137 class TagsDirectory(Directory):
138     '''A special directory that contains as subdirectories all tags visible to the user.'''
139
140     def __init__(self, parent_inode, inodes, api, poll_time=60):
141         super(TagsDirectory, self).__init__(parent_inode)
142         self.inodes = inodes
143         self.api = api
144         try:
145             arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
146         except:
147             self._poll = True
148             self._poll_time = poll_time
149
150     def invalidate(self):
151         with llfuse.lock:
152             super(TagsDirectory, self).invalidate()
153             for a in self._entries:
154                 self._entries[a].invalidate()
155
156     def update(self):
157         tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = 'name').execute()
158         oldentries = self._entries
159         self._entries = {}
160         for n in tags['items']:
161             n = n['name']
162             if n in oldentries:
163                 self._entries[n] = oldentries[n]
164             else:
165                 self._entries[n] = self.inodes.add_entry(TagDirectory(self.inode, self.inodes, self.api, n, poll=self._poll, poll_time=self._poll_time))
166         self.fresh()
167
168
169 class TagDirectory(Directory):
170     '''A special directory that contains as subdirectories all collections visible
171     to the user that are tagged with a particular tag.
172     '''
173
174     def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
175         super(TagDirectory, self).__init__(parent_inode)
176         self.inodes = inodes
177         self.api = api
178         self.tag = tag
179         self._poll = poll
180         self._poll_time = poll_time
181
182     def update(self):
183         collections = self.api.links().list(filters=[['link_class', '=', 'tag'],
184                                                ['name', '=', self.tag],
185                                                ['head_uuid', 'is_a', 'arvados#collection']],
186                                       select=['head_uuid']).execute()
187         oldentries = self._entries
188         self._entries = {}
189         for c in collections['items']:
190             n = c['head_uuid']
191             if n in oldentries:
192                 self._entries[n] = oldentries[n]
193             else:
194                 self._entries[n] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, n))
195         self.fresh()
196
197 class File(object):
198     '''Wraps a StreamFileReader for use by Directory.'''
199
200     def __init__(self, parent_inode, reader):
201         self.inode = None
202         self.parent_inode = parent_inode
203         self.reader = reader
204
205     def size(self):
206         return self.reader.size()
207
208
209 class FileHandle(object):
210     '''Connects a numeric file handle to a File or Directory object that has
211     been opened by the client.'''
212
213     def __init__(self, fh, entry):
214         self.fh = fh
215         self.entry = entry
216
217
218 class Inodes(object):
219     '''Manage the set of inodes.  This is the mapping from a numeric id
220     to a concrete File or Directory object'''
221
222     def __init__(self):
223         self._entries = {}
224         self._counter = llfuse.ROOT_INODE
225
226     def __getitem__(self, item):
227         return self._entries[item]
228
229     def __setitem__(self, key, item):
230         self._entries[key] = item
231
232     def __iter__(self):
233         return self._entries.iterkeys()
234
235     def items(self):
236         return self._entries.items()
237
238     def __contains__(self, k):
239         return k in self._entries
240
241     def add_entry(self, entry):
242         entry.inode = self._counter
243         self._entries[entry.inode] = entry
244         self._counter += 1
245         return entry
246
247 class Operations(llfuse.Operations):
248     '''This is the main interface with llfuse.  The methods on this object are
249     called by llfuse threads to service FUSE events to query and read from
250     the file system.
251
252     llfuse has its own global lock which is acquired before calling a request handler,
253     so request handlers do not run concurrently unless the lock is explicitly released
254     with llfuse.lock_released.'''
255
256     def __init__(self, uid, gid):
257         super(Operations, self).__init__()
258
259         self.inodes = Inodes()
260         self.uid = uid
261         self.gid = gid
262
263         # dict of inode to filehandle
264         self._filehandles = {}
265         self._filehandles_counter = 1
266
267         # Other threads that need to wait until the fuse driver
268         # is fully initialized should wait() on this event object.
269         self.initlock = threading.Event()
270
271     def init(self):
272         # Allow threads that are waiting for the driver to be finished
273         # initializing to continue
274         self.initlock.set()
275
276     def access(self, inode, mode, ctx):
277         return True
278
279     def getattr(self, inode):
280         e = self.inodes[inode]
281
282         entry = llfuse.EntryAttributes()
283         entry.st_ino = inode
284         entry.generation = 0
285         entry.entry_timeout = 300
286         entry.attr_timeout = 300
287
288         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
289         if isinstance(e, Directory):
290             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
291         else:
292             entry.st_mode |= stat.S_IFREG
293
294         entry.st_nlink = 1
295         entry.st_uid = self.uid
296         entry.st_gid = self.gid
297         entry.st_rdev = 0
298
299         entry.st_size = e.size()
300
301         entry.st_blksize = 1024
302         entry.st_blocks = e.size()/1024
303         if e.size()/1024 != 0:
304             entry.st_blocks += 1
305         entry.st_atime = 0
306         entry.st_mtime = 0
307         entry.st_ctime = 0
308
309         return entry
310
311     def lookup(self, parent_inode, name):
312         #print "lookup: parent_inode", parent_inode, "name", name
313         inode = None
314
315         if name == '.':
316             inode = parent_inode
317         else:
318             if parent_inode in self.inodes:
319                 p = self.inodes[parent_inode]
320                 if name == '..':
321                     inode = p.parent_inode
322                 elif name in p:
323                     inode = p[name].inode
324
325         if inode != None:
326             return self.getattr(inode)
327         else:
328             raise llfuse.FUSEError(errno.ENOENT)
329
330     def open(self, inode, flags):
331         if inode in self.inodes:
332             p = self.inodes[inode]
333         else:
334             raise llfuse.FUSEError(errno.ENOENT)
335
336         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
337             raise llfuse.FUSEError(errno.EROFS)
338
339         if isinstance(p, Directory):
340             raise llfuse.FUSEError(errno.EISDIR)
341
342         fh = self._filehandles_counter
343         self._filehandles_counter += 1
344         self._filehandles[fh] = FileHandle(fh, p)
345         return fh
346
347     def read(self, fh, off, size):
348         #print "read", fh, off, size
349         if fh in self._filehandles:
350             handle = self._filehandles[fh]
351         else:
352             raise llfuse.FUSEError(errno.EBADF)
353
354         try:
355             with llfuse.lock_released:
356                 return handle.entry.reader.readfrom(off, size)
357         except:
358             raise llfuse.FUSEError(errno.EIO)
359
360     def release(self, fh):
361         if fh in self._filehandles:
362             del self._filehandles[fh]
363
364     def opendir(self, inode):
365         #print "opendir: inode", inode
366
367         if inode in self.inodes:
368             p = self.inodes[inode]
369         else:
370             raise llfuse.FUSEError(errno.ENOENT)
371
372         if not isinstance(p, Directory):
373             raise llfuse.FUSEError(errno.ENOTDIR)
374
375         fh = self._filehandles_counter
376         self._filehandles_counter += 1
377         if p.parent_inode in self.inodes:
378             parent = self.inodes[p.parent_inode]
379         else:
380             raise llfuse.FUSEError(errno.EIO)
381
382         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
383         return fh
384
385     def readdir(self, fh, off):
386         #print "readdir: fh", fh, "off", off
387
388         if fh in self._filehandles:
389             handle = self._filehandles[fh]
390         else:
391             raise llfuse.FUSEError(errno.EBADF)
392
393         #print "handle.entry", handle.entry
394
395         e = off
396         while e < len(handle.entry):
397             yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
398             e += 1
399
400     def releasedir(self, fh):
401         del self._filehandles[fh]
402
403     def statfs(self):
404         st = llfuse.StatvfsData()
405         st.f_bsize = 1024 * 1024
406         st.f_blocks = 0
407         st.f_files = 0
408
409         st.f_bfree = 0
410         st.f_bavail = 0
411
412         st.f_ffree = 0
413         st.f_favail = 0
414
415         st.f_frsize = 0
416         return st
417
418     # The llfuse documentation recommends only overloading functions that
419     # are actually implemented, as the default implementation will raise ENOSYS.
420     # However, there is a bug in the llfuse default implementation of create()
421     # "create() takes exactly 5 positional arguments (6 given)" which will crash
422     # arv-mount.
423     # The workaround is to implement it with the proper number of parameters,
424     # and then everything works out.
425     def create(self, p1, p2, p3, p4, p5):
426         raise llfuse.FUSEError(errno.EROFS)