Merge branch 'master' into 3198-writable-fuse
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69
70 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
71 from fusefile import StringFile, FuseArvadosFile
72
73 _logger = logging.getLogger('arvados.arvados_fuse')
74
75 log_handler = logging.StreamHandler()
76 llogger = logging.getLogger('llfuse')
77 llogger.addHandler(log_handler)
78 llogger.setLevel(logging.DEBUG)
79
80 class Handle(object):
81     """Connects a numeric file handle to a File or Directory object that has
82     been opened by the client."""
83
84     def __init__(self, fh, obj):
85         self.fh = fh
86         self.obj = obj
87         self.obj.inc_use()
88
89     def release(self):
90         self.obj.dec_use()
91
92     def flush(self):
93         return self.obj.flush()
94
95
96 class FileHandle(Handle):
97     """Connects a numeric file handle to a File  object that has
98     been opened by the client."""
99     pass
100
101
102 class DirectoryHandle(Handle):
103     """Connects a numeric file handle to a Directory object that has
104     been opened by the client."""
105
106     def __init__(self, fh, dirobj, entries):
107         super(DirectoryHandle, self).__init__(fh, dirobj)
108         self.entries = entries
109
110
111 class InodeCache(object):
112     """Records the memory footprint of objects and when they are last used.
113
114     When the cache limit is exceeded, the least recently used objects are
115     cleared.  Clearing the object means discarding its contents to release
116     memory.  The next time the object is accessed, it must be re-fetched from
117     the server.  Note that the inode cache limit is a soft limit; the cache
118     limit may be exceeded if necessary to load very large objects, it may also
119     be exceeded if open file handles prevent objects from being cleared.
120
121     """
122
123     def __init__(self, cap, min_entries=4):
124         self._entries = collections.OrderedDict()
125         self._by_uuid = {}
126         self._counter = itertools.count(0)
127         self.cap = cap
128         self._total = 0
129         self.min_entries = min_entries
130
131     def total(self):
132         return self._total
133
134     def _remove(self, obj, clear):
135         if clear and not obj.clear():
136             _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
137             return False
138         self._total -= obj.cache_size
139         del self._entries[obj.cache_priority]
140         if obj.cache_uuid:
141             del self._by_uuid[obj.cache_uuid]
142             obj.cache_uuid = None
143         if clear:
144             _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
145         return True
146
147     def cap_cache(self):
148         #_logger.debug("InodeCache total is %i cap is %i", self._total, self.cap)
149         if self._total > self.cap:
150             for key in list(self._entries.keys()):
151                 if self._total < self.cap or len(self._entries) < self.min_entries:
152                     break
153                 self._remove(self._entries[key], True)
154
155     def manage(self, obj):
156         if obj.persisted():
157             obj.cache_priority = next(self._counter)
158             obj.cache_size = obj.objsize()
159             self._entries[obj.cache_priority] = obj
160             obj.cache_uuid = obj.uuid()
161             if obj.cache_uuid:
162                 self._by_uuid[obj.cache_uuid] = obj
163             self._total += obj.objsize()
164             _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
165             self.cap_cache()
166         else:
167             obj.cache_priority = None
168
169     def touch(self, obj):
170         if obj.persisted():
171             if obj.cache_priority in self._entries:
172                 self._remove(obj, False)
173             self.manage(obj)
174
175     def unmanage(self, obj):
176         if obj.persisted() and obj.cache_priority in self._entries:
177             self._remove(obj, True)
178
179     def find(self, uuid):
180         return self._by_uuid.get(uuid)
181
182 class Inodes(object):
183     """Manage the set of inodes.  This is the mapping from a numeric id
184     to a concrete File or Directory object"""
185
186     def __init__(self, inode_cache):
187         self._entries = {}
188         self._counter = itertools.count(llfuse.ROOT_INODE)
189         self.inode_cache = inode_cache
190
191     def __getitem__(self, item):
192         return self._entries[item]
193
194     def __setitem__(self, key, item):
195         self._entries[key] = item
196
197     def __iter__(self):
198         return self._entries.iterkeys()
199
200     def items(self):
201         return self._entries.items()
202
203     def __contains__(self, k):
204         return k in self._entries
205
206     def touch(self, entry):
207         entry._atime = time.time()
208         self.inode_cache.touch(entry)
209
210     def add_entry(self, entry):
211         entry.inode = next(self._counter)
212         if entry.inode == llfuse.ROOT_INODE:
213             entry.inc_ref()
214         self._entries[entry.inode] = entry
215         self.inode_cache.manage(entry)
216         return entry
217
218     def del_entry(self, entry):
219         if entry.ref_count == 0:
220             _logger.debug("Deleting inode %i", entry.inode)
221             self.inode_cache.unmanage(entry)
222             llfuse.invalidate_inode(entry.inode)
223             del self._entries[entry.inode]
224             entry.inode = None
225         else:
226             entry.dead = True
227             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
228
229
230 def catch_exceptions(orig_func):
231     """Catch uncaught exceptions and log them consistently."""
232
233     @functools.wraps(orig_func)
234     def catch_exceptions_wrapper(self, *args, **kwargs):
235         try:
236             return orig_func(self, *args, **kwargs)
237         except llfuse.FUSEError:
238             raise
239         except EnvironmentError as e:
240             raise llfuse.FUSEError(e.errno)
241         except:
242             _logger.exception("Unhandled exception during FUSE operation")
243             raise llfuse.FUSEError(errno.EIO)
244
245     return catch_exceptions_wrapper
246
247
248 class Operations(llfuse.Operations):
249     """This is the main interface with llfuse.
250
251     The methods on this object are called by llfuse threads to service FUSE
252     events to query and read from the file system.
253
254     llfuse has its own global lock which is acquired before calling a request handler,
255     so request handlers do not run concurrently unless the lock is explicitly released
256     using 'with llfuse.lock_released:'
257
258     """
259
260     def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4):
261         super(Operations, self).__init__()
262
263         if not inode_cache:
264             inode_cache = InodeCache(cap=256*1024*1024)
265         self.inodes = Inodes(inode_cache)
266         self.uid = uid
267         self.gid = gid
268         self.encoding = encoding
269
270         # dict of inode to filehandle
271         self._filehandles = {}
272         self._filehandles_counter = itertools.count(0)
273
274         # Other threads that need to wait until the fuse driver
275         # is fully initialized should wait() on this event object.
276         self.initlock = threading.Event()
277
278         self.num_retries = num_retries
279
280         self.events = None
281
282     def init(self):
283         # Allow threads that are waiting for the driver to be finished
284         # initializing to continue
285         self.initlock.set()
286
287     def destroy(self):
288         if self.events:
289             self.events.close()
290
291     def access(self, inode, mode, ctx):
292         return True
293
294     def listen_for_events(self, api_client):
295         self.event = arvados.events.subscribe(api_client,
296                                  [["event_type", "in", ["create", "update", "delete"]]],
297                                  self.on_event)
298
299     def on_event(self, ev):
300         if 'event_type' in ev:
301             with llfuse.lock:
302                 item = self.inodes.inode_cache.find(ev["object_uuid"])
303                 if item is not None:
304                     item.invalidate()
305                     if ev["object_kind"] == "arvados#collection":
306                         item.update(to_pdh=ev.get("properties", {}).get("new_attributes", {}).get("portable_data_hash"))
307                     else:
308                         item.update()
309
310                 oldowner = ev.get("properties", {}).get("old_attributes", {}).get("owner_uuid")
311                 olditemparent = self.inodes.inode_cache.find(oldowner)
312                 if olditemparent is not None:
313                     olditemparent.invalidate()
314                     olditemparent.update()
315
316                 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
317                 if itemparent is not None:
318                     itemparent.invalidate()
319                     itemparent.update()
320
321     @catch_exceptions
322     def getattr(self, inode):
323         if inode not in self.inodes:
324             raise llfuse.FUSEError(errno.ENOENT)
325
326         e = self.inodes[inode]
327
328         entry = llfuse.EntryAttributes()
329         entry.st_ino = inode
330         entry.generation = 0
331         entry.entry_timeout = 300
332         entry.attr_timeout = 300
333
334         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
335         if isinstance(e, Directory):
336             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
337         else:
338             entry.st_mode |= stat.S_IFREG
339             if isinstance(e, FuseArvadosFile):
340                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
341
342         if e.writable():
343             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
344
345         entry.st_nlink = 1
346         entry.st_uid = self.uid
347         entry.st_gid = self.gid
348         entry.st_rdev = 0
349
350         entry.st_size = e.size()
351
352         entry.st_blksize = 512
353         entry.st_blocks = (e.size()/512)+1
354         entry.st_atime = int(e.atime())
355         entry.st_mtime = int(e.mtime())
356         entry.st_ctime = int(e.mtime())
357
358         return entry
359
360     @catch_exceptions
361     def setattr(self, inode, attr):
362         entry = self.getattr(inode)
363
364         e = self.inodes[inode]
365
366         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
367             with llfuse.lock_released:
368                 e.arvfile.truncate(attr.st_size)
369                 entry.st_size = e.arvfile.size()
370
371         return entry
372
373     @catch_exceptions
374     def lookup(self, parent_inode, name):
375         name = unicode(name, self.encoding)
376         inode = None
377
378         if name == '.':
379             inode = parent_inode
380         else:
381             if parent_inode in self.inodes:
382                 p = self.inodes[parent_inode]
383                 if name == '..':
384                     inode = p.parent_inode
385                 elif isinstance(p, Directory) and name in p:
386                     inode = p[name].inode
387
388         if inode != None:
389             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
390                       parent_inode, name, inode)
391             self.inodes[inode].inc_ref()
392             return self.getattr(inode)
393         else:
394             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
395                       parent_inode, name)
396             raise llfuse.FUSEError(errno.ENOENT)
397
398     @catch_exceptions
399     def forget(self, inodes):
400         for inode, nlookup in inodes:
401             ent = self.inodes[inode]
402             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
403             if ent.dec_ref(nlookup) == 0 and ent.dead:
404                 self.inodes.del_entry(ent)
405
406     @catch_exceptions
407     def open(self, inode, flags):
408         if inode in self.inodes:
409             p = self.inodes[inode]
410         else:
411             raise llfuse.FUSEError(errno.ENOENT)
412
413         if isinstance(p, Directory):
414             raise llfuse.FUSEError(errno.EISDIR)
415
416         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
417             raise llfuse.FUSEError(errno.EPERM)
418
419         fh = next(self._filehandles_counter)
420         self._filehandles[fh] = FileHandle(fh, p)
421         self.inodes.touch(p)
422         return fh
423
424     @catch_exceptions
425     def read(self, fh, off, size):
426         _logger.debug("arv-mount read %i %i %i", fh, off, size)
427         if fh in self._filehandles:
428             handle = self._filehandles[fh]
429         else:
430             raise llfuse.FUSEError(errno.EBADF)
431
432         self.inodes.touch(handle.obj)
433
434         try:
435             return handle.obj.readfrom(off, size, self.num_retries)
436         except arvados.errors.NotFoundError as e:
437             _logger.error("Block not found: " + str(e))
438             raise llfuse.FUSEError(errno.EIO)
439
440     @catch_exceptions
441     def write(self, fh, off, buf):
442         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
443         if fh in self._filehandles:
444             handle = self._filehandles[fh]
445         else:
446             raise llfuse.FUSEError(errno.EBADF)
447
448         if not handle.obj.writable():
449             raise llfuse.FUSEError(errno.EPERM)
450
451         self.inodes.touch(handle.obj)
452
453         return handle.obj.writeto(off, buf, self.num_retries)
454
455     @catch_exceptions
456     def release(self, fh):
457         if fh in self._filehandles:
458             try:
459                 self._filehandles[fh].flush()
460             except EnvironmentError as e:
461                 raise llfuse.FUSEError(e.errno)
462             except Exception:
463                 _logger.exception("Flush error")
464             self._filehandles[fh].release()
465             del self._filehandles[fh]
466         self.inodes.inode_cache.cap_cache()
467
468     def releasedir(self, fh):
469         self.release(fh)
470
471     @catch_exceptions
472     def opendir(self, inode):
473         _logger.debug("arv-mount opendir: inode %i", inode)
474
475         if inode in self.inodes:
476             p = self.inodes[inode]
477         else:
478             raise llfuse.FUSEError(errno.ENOENT)
479
480         if not isinstance(p, Directory):
481             raise llfuse.FUSEError(errno.ENOTDIR)
482
483         fh = next(self._filehandles_counter)
484         if p.parent_inode in self.inodes:
485             parent = self.inodes[p.parent_inode]
486         else:
487             raise llfuse.FUSEError(errno.EIO)
488
489         # update atime
490         self.inodes.touch(p)
491
492         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
493         return fh
494
495     @catch_exceptions
496     def readdir(self, fh, off):
497         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
498
499         if fh in self._filehandles:
500             handle = self._filehandles[fh]
501         else:
502             raise llfuse.FUSEError(errno.EBADF)
503
504         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
505
506         e = off
507         while e < len(handle.entries):
508             if handle.entries[e][1].inode in self.inodes:
509                 try:
510                     yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
511                 except UnicodeEncodeError:
512                     pass
513             e += 1
514
515     @catch_exceptions
516     def statfs(self):
517         st = llfuse.StatvfsData()
518         st.f_bsize = 128 * 1024
519         st.f_blocks = 0
520         st.f_files = 0
521
522         st.f_bfree = 0
523         st.f_bavail = 0
524
525         st.f_ffree = 0
526         st.f_favail = 0
527
528         st.f_frsize = 0
529         return st
530
531     def _check_writable(self, inode_parent):
532         if inode_parent in self.inodes:
533             p = self.inodes[inode_parent]
534         else:
535             raise llfuse.FUSEError(errno.ENOENT)
536
537         if not isinstance(p, Directory):
538             raise llfuse.FUSEError(errno.ENOTDIR)
539
540         if not p.writable():
541             raise llfuse.FUSEError(errno.EPERM)
542
543         return p
544
545     @catch_exceptions
546     def create(self, inode_parent, name, mode, flags, ctx):
547         p = self._check_writable(inode_parent)
548         p.create(name)
549
550         # The file entry should have been implicitly created by callback.
551         f = p[name]
552         fh = next(self._filehandles_counter)
553         self._filehandles[fh] = FileHandle(fh, f)
554         self.inodes.touch(p)
555
556         f.inc_ref()
557         return (fh, self.getattr(f.inode))
558
559     @catch_exceptions
560     def mkdir(self, inode_parent, name, mode, ctx):
561         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
562
563         p = self._check_writable(inode_parent)
564         p.mkdir(name)
565
566         # The dir entry should have been implicitly created by callback.
567         d = p[name]
568
569         d.inc_ref()
570         return self.getattr(d.inode)
571
572     @catch_exceptions
573     def unlink(self, inode_parent, name):
574         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
575         p = self._check_writable(inode_parent)
576         p.unlink(name)
577
578     @catch_exceptions
579     def rmdir(self, inode_parent, name):
580         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
581         p = self._check_writable(inode_parent)
582         p.rmdir(name)
583
584     @catch_exceptions
585     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
586         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
587         src = self._check_writable(inode_parent_old)
588         dest = self._check_writable(inode_parent_new)
589         dest.rename(name_old, name_new, src)
590
591     @catch_exceptions
592     def flush(self, fh):
593         if fh in self._filehandles:
594             self._filehandles[fh].flush()
595
596     def fsync(self, fh, datasync):
597         self.flush(fh)
598
599     def fsyncdir(self, fh, datasync):
600         self.flush(fh)