913db4ccfea6fa12bde06b7a623f760f03f99374
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69
70 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
71 from fusefile import StringFile, FuseArvadosFile
72
73 _logger = logging.getLogger('arvados.arvados_fuse')
74
75 log_handler = logging.StreamHandler()
76 llogger = logging.getLogger('llfuse')
77 llogger.addHandler(log_handler)
78 llogger.setLevel(logging.DEBUG)
79
80 class Handle(object):
81     """Connects a numeric file handle to a File or Directory object that has
82     been opened by the client."""
83
84     def __init__(self, fh, obj):
85         self.fh = fh
86         self.obj = obj
87         self.obj.inc_use()
88
89     def release(self):
90         self.obj.dec_use()
91
92     def flush(self):
93         return self.obj.flush()
94
95
96 class FileHandle(Handle):
97     """Connects a numeric file handle to a File  object that has
98     been opened by the client."""
99     pass
100
101
102 class DirectoryHandle(Handle):
103     """Connects a numeric file handle to a Directory object that has
104     been opened by the client."""
105
106     def __init__(self, fh, dirobj, entries):
107         super(DirectoryHandle, self).__init__(fh, dirobj)
108         self.entries = entries
109
110
111 class InodeCache(object):
112     """Records the memory footprint of objects and when they are last used.
113
114     When the cache limit is exceeded, the least recently used objects are
115     cleared.  Clearing the object means discarding its contents to release
116     memory.  The next time the object is accessed, it must be re-fetched from
117     the server.  Note that the inode cache limit is a soft limit; the cache
118     limit may be exceeded if necessary to load very large objects, it may also
119     be exceeded if open file handles prevent objects from being cleared.
120
121     """
122
123     def __init__(self, cap, min_entries=4):
124         self._entries = collections.OrderedDict()
125         self._by_uuid = {}
126         self._counter = itertools.count(0)
127         self.cap = cap
128         self._total = 0
129         self.min_entries = min_entries
130
131     def total(self):
132         return self._total
133
134     def _remove(self, obj, clear):
135         if clear and not obj.clear():
136             _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
137             return False
138         self._total -= obj.cache_size
139         del self._entries[obj.cache_priority]
140         if obj.cache_uuid:
141             del self._by_uuid[obj.cache_uuid]
142             obj.cache_uuid = None
143         if clear:
144             _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
145         return True
146
147     def cap_cache(self):
148         #_logger.debug("InodeCache total is %i cap is %i", self._total, self.cap)
149         if self._total > self.cap:
150             for key in list(self._entries.keys()):
151                 if self._total < self.cap or len(self._entries) < self.min_entries:
152                     break
153                 self._remove(self._entries[key], True)
154
155     def manage(self, obj):
156         if obj.persisted():
157             obj.cache_priority = next(self._counter)
158             obj.cache_size = obj.objsize()
159             self._entries[obj.cache_priority] = obj
160             obj.cache_uuid = obj.uuid()
161             if obj.cache_uuid:
162                 self._by_uuid[obj.cache_uuid] = obj
163             self._total += obj.objsize()
164             _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
165             self.cap_cache()
166         else:
167             obj.cache_priority = None
168
169     def touch(self, obj):
170         if obj.persisted():
171             if obj.cache_priority in self._entries:
172                 self._remove(obj, False)
173             self.manage(obj)
174
175     def unmanage(self, obj):
176         if obj.persisted() and obj.cache_priority in self._entries:
177             self._remove(obj, True)
178
179     def find(self, uuid):
180         return self._by_uuid.get(uuid)
181
182 class Inodes(object):
183     """Manage the set of inodes.  This is the mapping from a numeric id
184     to a concrete File or Directory object"""
185
186     def __init__(self, inode_cache):
187         self._entries = {}
188         self._counter = itertools.count(llfuse.ROOT_INODE)
189         self.inode_cache = inode_cache
190
191     def __getitem__(self, item):
192         return self._entries[item]
193
194     def __setitem__(self, key, item):
195         self._entries[key] = item
196
197     def __iter__(self):
198         return self._entries.iterkeys()
199
200     def items(self):
201         return self._entries.items()
202
203     def __contains__(self, k):
204         return k in self._entries
205
206     def touch(self, entry):
207         entry._atime = time.time()
208         self.inode_cache.touch(entry)
209
210     def add_entry(self, entry):
211         entry.inode = next(self._counter)
212         if entry.inode == llfuse.ROOT_INODE:
213             entry.inc_ref()
214         self._entries[entry.inode] = entry
215         self.inode_cache.manage(entry)
216         return entry
217
218     def del_entry(self, entry):
219         if entry.ref_count == 0:
220             _logger.debug("Deleting inode %i", entry.inode)
221             self.inode_cache.unmanage(entry)
222             llfuse.invalidate_inode(entry.inode)
223             del self._entries[entry.inode]
224             entry.inode = None
225         else:
226             entry.dead = True
227             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
228
229
230 def catch_exceptions(orig_func):
231     """Catch uncaught exceptions and log them consistently."""
232
233     @functools.wraps(orig_func)
234     def catch_exceptions_wrapper(self, *args, **kwargs):
235         try:
236             return orig_func(self, *args, **kwargs)
237         except llfuse.FUSEError:
238             raise
239         except EnvironmentError as e:
240             raise llfuse.FUSEError(e.errno)
241         except:
242             _logger.exception("Unhandled exception during FUSE operation")
243             raise llfuse.FUSEError(errno.EIO)
244
245     return catch_exceptions_wrapper
246
247
248 class Operations(llfuse.Operations):
249     """This is the main interface with llfuse.
250
251     The methods on this object are called by llfuse threads to service FUSE
252     events to query and read from the file system.
253
254     llfuse has its own global lock which is acquired before calling a request handler,
255     so request handlers do not run concurrently unless the lock is explicitly released
256     using 'with llfuse.lock_released:'
257
258     """
259
260     def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4):
261         super(Operations, self).__init__()
262
263         if not inode_cache:
264             inode_cache = InodeCache(cap=256*1024*1024)
265         self.inodes = Inodes(inode_cache)
266         self.uid = uid
267         self.gid = gid
268         self.encoding = encoding
269
270         # dict of inode to filehandle
271         self._filehandles = {}
272         self._filehandles_counter = itertools.count(0)
273
274         # Other threads that need to wait until the fuse driver
275         # is fully initialized should wait() on this event object.
276         self.initlock = threading.Event()
277
278         self.num_retries = num_retries
279
280         self.events = None
281
282     def init(self):
283         # Allow threads that are waiting for the driver to be finished
284         # initializing to continue
285         self.initlock.set()
286
287     def destroy(self):
288         if self.events:
289             self.events.close()
290
291     def access(self, inode, mode, ctx):
292         return True
293
294     def listen_for_events(self, api_client):
295         self.event = arvados.events.subscribe(api_client,
296                                  [["event_type", "in", ["create", "update", "delete"]]],
297                                  self.on_event)
298
299     def on_event(self, ev):
300         if 'event_type' in ev:
301             with llfuse.lock:
302                 item = self.inodes.inode_cache.find(ev["object_uuid"])
303                 if item is not None:
304                     item.invalidate()
305                     item.update()
306
307                 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
308                 if itemparent is not None:
309                     itemparent.invalidate()
310                     itemparent.update()
311
312     @catch_exceptions
313     def getattr(self, inode):
314         if inode not in self.inodes:
315             raise llfuse.FUSEError(errno.ENOENT)
316
317         e = self.inodes[inode]
318
319         entry = llfuse.EntryAttributes()
320         entry.st_ino = inode
321         entry.generation = 0
322         entry.entry_timeout = 300
323         entry.attr_timeout = 300
324
325         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
326         if isinstance(e, Directory):
327             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
328         else:
329             entry.st_mode |= stat.S_IFREG
330             if isinstance(e, FuseArvadosFile):
331                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
332
333         if e.writable():
334             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
335
336         entry.st_nlink = 1
337         entry.st_uid = self.uid
338         entry.st_gid = self.gid
339         entry.st_rdev = 0
340
341         entry.st_size = e.size()
342
343         entry.st_blksize = 512
344         entry.st_blocks = (e.size()/512)+1
345         entry.st_atime = int(e.atime())
346         entry.st_mtime = int(e.mtime())
347         entry.st_ctime = int(e.mtime())
348
349         return entry
350
351     @catch_exceptions
352     def setattr(self, inode, attr):
353         entry = self.getattr(inode)
354
355         e = self.inodes[inode]
356
357         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
358             with llfuse.lock_released:
359                 e.arvfile.truncate(attr.st_size)
360                 entry.st_size = e.arvfile.size()
361
362         return entry
363
364     @catch_exceptions
365     def lookup(self, parent_inode, name):
366         name = unicode(name, self.encoding)
367         inode = None
368
369         if name == '.':
370             inode = parent_inode
371         else:
372             if parent_inode in self.inodes:
373                 p = self.inodes[parent_inode]
374                 if name == '..':
375                     inode = p.parent_inode
376                 elif isinstance(p, Directory) and name in p:
377                     inode = p[name].inode
378
379         if inode != None:
380             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
381                       parent_inode, name, inode)
382             self.inodes[inode].inc_ref()
383             return self.getattr(inode)
384         else:
385             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
386                       parent_inode, name)
387             raise llfuse.FUSEError(errno.ENOENT)
388
389     @catch_exceptions
390     def forget(self, inodes):
391         for inode, nlookup in inodes:
392             ent = self.inodes[inode]
393             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
394             if ent.dec_ref(nlookup) == 0 and ent.dead:
395                 self.inodes.del_entry(ent)
396
397     @catch_exceptions
398     def open(self, inode, flags):
399         if inode in self.inodes:
400             p = self.inodes[inode]
401         else:
402             raise llfuse.FUSEError(errno.ENOENT)
403
404         if isinstance(p, Directory):
405             raise llfuse.FUSEError(errno.EISDIR)
406
407         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
408             raise llfuse.FUSEError(errno.EPERM)
409
410         fh = next(self._filehandles_counter)
411         self._filehandles[fh] = FileHandle(fh, p)
412         self.inodes.touch(p)
413         return fh
414
415     @catch_exceptions
416     def read(self, fh, off, size):
417         _logger.debug("arv-mount read %i %i %i", fh, off, size)
418         if fh in self._filehandles:
419             handle = self._filehandles[fh]
420         else:
421             raise llfuse.FUSEError(errno.EBADF)
422
423         self.inodes.touch(handle.obj)
424
425         try:
426             return handle.obj.readfrom(off, size, self.num_retries)
427         except arvados.errors.NotFoundError as e:
428             _logger.error("Block not found: " + str(e))
429             raise llfuse.FUSEError(errno.EIO)
430
431     @catch_exceptions
432     def write(self, fh, off, buf):
433         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
434         if fh in self._filehandles:
435             handle = self._filehandles[fh]
436         else:
437             raise llfuse.FUSEError(errno.EBADF)
438
439         if not handle.obj.writable():
440             raise llfuse.FUSEError(errno.EPERM)
441
442         self.inodes.touch(handle.obj)
443
444         return handle.obj.writeto(off, buf, self.num_retries)
445
446     @catch_exceptions
447     def release(self, fh):
448         if fh in self._filehandles:
449             try:
450                 self._filehandles[fh].flush()
451             except EnvironmentError as e:
452                 raise llfuse.FUSEError(e.errno)
453             except Exception:
454                 _logger.exception("Flush error")
455             self._filehandles[fh].release()
456             del self._filehandles[fh]
457         self.inodes.inode_cache.cap_cache()
458
459     def releasedir(self, fh):
460         self.release(fh)
461
462     @catch_exceptions
463     def opendir(self, inode):
464         _logger.debug("arv-mount opendir: inode %i", inode)
465
466         if inode in self.inodes:
467             p = self.inodes[inode]
468         else:
469             raise llfuse.FUSEError(errno.ENOENT)
470
471         if not isinstance(p, Directory):
472             raise llfuse.FUSEError(errno.ENOTDIR)
473
474         fh = next(self._filehandles_counter)
475         if p.parent_inode in self.inodes:
476             parent = self.inodes[p.parent_inode]
477         else:
478             raise llfuse.FUSEError(errno.EIO)
479
480         # update atime
481         self.inodes.touch(p)
482
483         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
484         return fh
485
486     @catch_exceptions
487     def readdir(self, fh, off):
488         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
489
490         if fh in self._filehandles:
491             handle = self._filehandles[fh]
492         else:
493             raise llfuse.FUSEError(errno.EBADF)
494
495         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
496
497         e = off
498         while e < len(handle.entries):
499             if handle.entries[e][1].inode in self.inodes:
500                 try:
501                     yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
502                 except UnicodeEncodeError:
503                     pass
504             e += 1
505
506     @catch_exceptions
507     def statfs(self):
508         st = llfuse.StatvfsData()
509         st.f_bsize = 128 * 1024
510         st.f_blocks = 0
511         st.f_files = 0
512
513         st.f_bfree = 0
514         st.f_bavail = 0
515
516         st.f_ffree = 0
517         st.f_favail = 0
518
519         st.f_frsize = 0
520         return st
521
522     def _check_writable(self, inode_parent):
523         if inode_parent in self.inodes:
524             p = self.inodes[inode_parent]
525         else:
526             raise llfuse.FUSEError(errno.ENOENT)
527
528         if not isinstance(p, Directory):
529             raise llfuse.FUSEError(errno.ENOTDIR)
530
531         if not p.writable():
532             raise llfuse.FUSEError(errno.EPERM)
533
534         return p
535
536     @catch_exceptions
537     def create(self, inode_parent, name, mode, flags, ctx):
538         p = self._check_writable(inode_parent)
539         p.create(name)
540
541         # The file entry should have been implicitly created by callback.
542         f = p[name]
543         fh = next(self._filehandles_counter)
544         self._filehandles[fh] = FileHandle(fh, f)
545         self.inodes.touch(p)
546
547         f.inc_ref()
548         return (fh, self.getattr(f.inode))
549
550     @catch_exceptions
551     def mkdir(self, inode_parent, name, mode, ctx):
552         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
553
554         p = self._check_writable(inode_parent)
555         p.mkdir(name)
556
557         # The dir entry should have been implicitly created by callback.
558         d = p[name]
559
560         d.inc_ref()
561         return self.getattr(d.inode)
562
563     @catch_exceptions
564     def unlink(self, inode_parent, name):
565         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
566         p = self._check_writable(inode_parent)
567         p.unlink(name)
568
569     @catch_exceptions
570     def rmdir(self, inode_parent, name):
571         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
572         p = self._check_writable(inode_parent)
573         p.rmdir(name)
574
575     @catch_exceptions
576     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
577         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
578         src = self._check_writable(inode_parent_old)
579         dest = self._check_writable(inode_parent_new)
580         dest.rename(name_old, name_new, src)
581
582     @catch_exceptions
583     def flush(self, fh):
584         if fh in self._filehandles:
585             self._filehandles[fh].flush()
586
587     def fsync(self, fh, datasync):
588         self.flush(fh)
589
590     def fsyncdir(self, fh, datasync):
591         self.flush(fh)