7846: Better directory entry invalidation, fixes MagicDirApiError test. Also
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69 import arvados.keep
70
71 import Queue
72
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
75 # details.
76
77 llfuse.capi._notify_queue = Queue.Queue()
78
79 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
80 from fusefile import StringFile, FuseArvadosFile
81
82 _logger = logging.getLogger('arvados.arvados_fuse')
83
84 # Uncomment this to enable llfuse debug logging.
85 # log_handler = logging.StreamHandler()
86 # llogger = logging.getLogger('llfuse')
87 # llogger.addHandler(log_handler)
88 # llogger.setLevel(logging.DEBUG)
89
90 class Handle(object):
91     """Connects a numeric file handle to a File or Directory object that has
92     been opened by the client."""
93
94     def __init__(self, fh, obj):
95         self.fh = fh
96         self.obj = obj
97         self.obj.inc_use()
98
99     def release(self):
100         self.obj.dec_use()
101
102     def flush(self):
103         if self.obj.writable():
104             return self.obj.flush()
105
106
107 class FileHandle(Handle):
108     """Connects a numeric file handle to a File  object that has
109     been opened by the client."""
110     pass
111
112
113 class DirectoryHandle(Handle):
114     """Connects a numeric file handle to a Directory object that has
115     been opened by the client."""
116
117     def __init__(self, fh, dirobj, entries):
118         super(DirectoryHandle, self).__init__(fh, dirobj)
119         self.entries = entries
120
121
122 class InodeCache(object):
123     """Records the memory footprint of objects and when they are last used.
124
125     When the cache limit is exceeded, the least recently used objects are
126     cleared.  Clearing the object means discarding its contents to release
127     memory.  The next time the object is accessed, it must be re-fetched from
128     the server.  Note that the inode cache limit is a soft limit; the cache
129     limit may be exceeded if necessary to load very large objects, it may also
130     be exceeded if open file handles prevent objects from being cleared.
131
132     """
133
134     def __init__(self, cap, min_entries=4):
135         self._entries = collections.OrderedDict()
136         self._by_uuid = {}
137         self._counter = itertools.count(0)
138         self.cap = cap
139         self._total = 0
140         self.min_entries = min_entries
141
142     def total(self):
143         return self._total
144
145     def _remove(self, obj, clear):
146         if clear and not obj.clear():
147             _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
148             return False
149         self._total -= obj.cache_size
150         del self._entries[obj.cache_priority]
151         if obj.cache_uuid:
152             self._by_uuid[obj.cache_uuid].remove(obj)
153             if not self._by_uuid[obj.cache_uuid]:
154                 del self._by_uuid[obj.cache_uuid]
155             obj.cache_uuid = None
156         if clear:
157             _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
158         return True
159
160     def cap_cache(self):
161         if self._total > self.cap:
162             for key in list(self._entries.keys()):
163                 if self._total < self.cap or len(self._entries) < self.min_entries:
164                     break
165                 self._remove(self._entries[key], True)
166
167     def manage(self, obj):
168         if obj.persisted():
169             obj.cache_priority = next(self._counter)
170             obj.cache_size = obj.objsize()
171             self._entries[obj.cache_priority] = obj
172             obj.cache_uuid = obj.uuid()
173             if obj.cache_uuid:
174                 if obj.cache_uuid not in self._by_uuid:
175                     self._by_uuid[obj.cache_uuid] = [obj]
176                 else:
177                     if obj not in self._by_uuid[obj.cache_uuid]:
178                         self._by_uuid[obj.cache_uuid].append(obj)
179             self._total += obj.objsize()
180             _logger.debug("InodeCache touched %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
181             self.cap_cache()
182         else:
183             obj.cache_priority = None
184
185     def touch(self, obj):
186         if obj.persisted():
187             if obj.cache_priority in self._entries:
188                 self._remove(obj, False)
189             self.manage(obj)
190
191     def unmanage(self, obj):
192         if obj.persisted() and obj.cache_priority in self._entries:
193             self._remove(obj, True)
194
195     def find_by_uuid(self, uuid):
196         return self._by_uuid.get(uuid, [])
197
198     def clear(self):
199         self._entries.clear()
200         self._by_uuid.clear()
201         self._total = 0
202
203 class Inodes(object):
204     """Manage the set of inodes.  This is the mapping from a numeric id
205     to a concrete File or Directory object"""
206
207     def __init__(self, inode_cache, encoding="utf-8"):
208         self._entries = {}
209         self._counter = itertools.count(llfuse.ROOT_INODE)
210         self.inode_cache = inode_cache
211         self.encoding = encoding
212         self.deferred_invalidations = []
213
214     def __getitem__(self, item):
215         return self._entries[item]
216
217     def __setitem__(self, key, item):
218         self._entries[key] = item
219
220     def __iter__(self):
221         return self._entries.iterkeys()
222
223     def items(self):
224         return self._entries.items()
225
226     def __contains__(self, k):
227         return k in self._entries
228
229     def touch(self, entry):
230         entry._atime = time.time()
231         self.inode_cache.touch(entry)
232
233     def add_entry(self, entry):
234         entry.inode = next(self._counter)
235         if entry.inode == llfuse.ROOT_INODE:
236             entry.inc_ref()
237         self._entries[entry.inode] = entry
238         self.inode_cache.manage(entry)
239         return entry
240
241     def del_entry(self, entry):
242         if entry.ref_count == 0:
243             self.inode_cache.unmanage(entry)
244             del self._entries[entry.inode]
245             with llfuse.lock_released:
246                 entry.finalize()
247             self.invalidate_inode(entry.inode)
248             entry.inode = None
249         else:
250             entry.dead = True
251             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
252
253     def invalidate_inode(self, inode):
254         llfuse.invalidate_inode(inode)
255
256     def invalidate_entry(self, inode, name):
257         llfuse.invalidate_entry(inode, name.encode(self.encoding))
258
259     def clear(self):
260         self.inode_cache.clear()
261
262         for k,v in self._entries.items():
263             try:
264                 v.finalize()
265             except Exception as e:
266                 _logger.exception("Error during finalize of inode %i", k)
267
268         self._entries.clear()
269
270
271 def catch_exceptions(orig_func):
272     """Catch uncaught exceptions and log them consistently."""
273
274     @functools.wraps(orig_func)
275     def catch_exceptions_wrapper(self, *args, **kwargs):
276         try:
277             return orig_func(self, *args, **kwargs)
278         except llfuse.FUSEError:
279             raise
280         except EnvironmentError as e:
281             raise llfuse.FUSEError(e.errno)
282         except arvados.errors.KeepWriteError as e:
283             _logger.error("Keep write error: " + str(e))
284             raise llfuse.FUSEError(errno.EIO)
285         except arvados.errors.NotFoundError as e:
286             _logger.error("Block not found error: " + str(e))
287             raise llfuse.FUSEError(errno.EIO)
288         except:
289             _logger.exception("Unhandled exception during FUSE operation")
290             raise llfuse.FUSEError(errno.EIO)
291
292     return catch_exceptions_wrapper
293
294
295 class Operations(llfuse.Operations):
296     """This is the main interface with llfuse.
297
298     The methods on this object are called by llfuse threads to service FUSE
299     events to query and read from the file system.
300
301     llfuse has its own global lock which is acquired before calling a request handler,
302     so request handlers do not run concurrently unless the lock is explicitly released
303     using 'with llfuse.lock_released:'
304
305     """
306
307     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
308         super(Operations, self).__init__()
309
310         self._api_client = api_client
311
312         if not inode_cache:
313             inode_cache = InodeCache(cap=256*1024*1024)
314         self.inodes = Inodes(inode_cache, encoding=encoding)
315         self.uid = uid
316         self.gid = gid
317         self.enable_write = enable_write
318
319         # dict of inode to filehandle
320         self._filehandles = {}
321         self._filehandles_counter = itertools.count(0)
322
323         # Other threads that need to wait until the fuse driver
324         # is fully initialized should wait() on this event object.
325         self.initlock = threading.Event()
326
327         # If we get overlapping shutdown events (e.g., fusermount -u
328         # -z and operations.destroy()) llfuse calls forget() on inodes
329         # that have already been deleted. To avoid this, we make
330         # forget() a no-op if called after destroy().
331         self._shutdown_started = threading.Event()
332
333         self.num_retries = num_retries
334
335         self.read_counter = arvados.keep.Counter()
336         self.write_counter = arvados.keep.Counter()
337         self.read_ops_counter = arvados.keep.Counter()
338         self.write_ops_counter = arvados.keep.Counter()
339
340         self.events = None
341
342     def init(self):
343         # Allow threads that are waiting for the driver to be finished
344         # initializing to continue
345         self.initlock.set()
346
347     @catch_exceptions
348     def destroy(self):
349         with llfuse.lock:
350             self._shutdown_started.set()
351             if self.events:
352                 self.events.close()
353                 self.events = None
354
355             self.inodes.clear()
356
357     def access(self, inode, mode, ctx):
358         return True
359
360     def listen_for_events(self):
361         self.events = arvados.events.subscribe(self._api_client,
362                                  [["event_type", "in", ["create", "update", "delete"]]],
363                                  self.on_event)
364
365     @catch_exceptions
366     def on_event(self, ev):
367         if 'event_type' not in ev:
368             return
369         with llfuse.lock:
370             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
371                 item.invalidate()
372                 if ev["object_kind"] == "arvados#collection":
373                     new_attr = (ev.get("properties") and
374                                 ev["properties"].get("new_attributes") and
375                                 ev["properties"]["new_attributes"])
376
377                     # new_attributes.modified_at currently lacks
378                     # subsecond precision (see #6347) so use event_at
379                     # which should always be the same.
380                     record_version = (
381                         (ev["event_at"], new_attr["portable_data_hash"])
382                         if new_attr else None)
383
384                     item.update(to_record_version=record_version)
385                 else:
386                     item.update()
387
388             oldowner = (
389                 ev.get("properties") and
390                 ev["properties"].get("old_attributes") and
391                 ev["properties"]["old_attributes"].get("owner_uuid"))
392             newowner = ev["object_owner_uuid"]
393             for parent in (
394                     self.inodes.inode_cache.find_by_uuid(oldowner) +
395                     self.inodes.inode_cache.find_by_uuid(newowner)):
396                 parent.invalidate()
397                 parent.update()
398
399
400     @catch_exceptions
401     def getattr(self, inode):
402         if inode not in self.inodes:
403             raise llfuse.FUSEError(errno.ENOENT)
404
405         e = self.inodes[inode]
406
407         entry = llfuse.EntryAttributes()
408         entry.st_ino = inode
409         entry.generation = 0
410         entry.entry_timeout = 60 if e.allow_dirent_cache else 0
411         entry.attr_timeout = 60 if e.allow_attr_cache else 0
412
413         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
414         if isinstance(e, Directory):
415             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
416         else:
417             entry.st_mode |= stat.S_IFREG
418             if isinstance(e, FuseArvadosFile):
419                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
420
421         if self.enable_write and e.writable():
422             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
423
424         entry.st_nlink = 1
425         entry.st_uid = self.uid
426         entry.st_gid = self.gid
427         entry.st_rdev = 0
428
429         entry.st_size = e.size()
430
431         entry.st_blksize = 512
432         entry.st_blocks = (entry.st_size/512)+1
433         entry.st_atime = int(e.atime())
434         entry.st_mtime = int(e.mtime())
435         entry.st_ctime = int(e.mtime())
436
437         return entry
438
439     @catch_exceptions
440     def setattr(self, inode, attr):
441         entry = self.getattr(inode)
442
443         e = self.inodes[inode]
444
445         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
446             with llfuse.lock_released:
447                 e.arvfile.truncate(attr.st_size)
448                 entry.st_size = e.arvfile.size()
449
450         return entry
451
452     @catch_exceptions
453     def lookup(self, parent_inode, name):
454         name = unicode(name, self.inodes.encoding)
455         inode = None
456
457         if name == '.':
458             inode = parent_inode
459         else:
460             if parent_inode in self.inodes:
461                 p = self.inodes[parent_inode]
462                 if name == '..':
463                     inode = p.parent_inode
464                 elif isinstance(p, Directory) and name in p:
465                     inode = p[name].inode
466
467         if inode != None:
468             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
469                       parent_inode, name, inode)
470             self.inodes[inode].inc_ref()
471             return self.getattr(inode)
472         else:
473             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
474                       parent_inode, name)
475             raise llfuse.FUSEError(errno.ENOENT)
476
477     @catch_exceptions
478     def forget(self, inodes):
479         if self._shutdown_started.is_set():
480             return
481         for inode, nlookup in inodes:
482             ent = self.inodes[inode]
483             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
484             if ent.dec_ref(nlookup) == 0 and ent.dead:
485                 self.inodes.del_entry(ent)
486
487     @catch_exceptions
488     def open(self, inode, flags):
489         if inode in self.inodes:
490             p = self.inodes[inode]
491         else:
492             raise llfuse.FUSEError(errno.ENOENT)
493
494         if isinstance(p, Directory):
495             raise llfuse.FUSEError(errno.EISDIR)
496
497         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
498             raise llfuse.FUSEError(errno.EPERM)
499
500         fh = next(self._filehandles_counter)
501         self._filehandles[fh] = FileHandle(fh, p)
502         self.inodes.touch(p)
503         return fh
504
505     @catch_exceptions
506     def read(self, fh, off, size):
507         _logger.debug("arv-mount read %i %i %i", fh, off, size)
508         self.read_ops_counter.add(1)
509
510         if fh in self._filehandles:
511             handle = self._filehandles[fh]
512         else:
513             raise llfuse.FUSEError(errno.EBADF)
514
515         self.inodes.touch(handle.obj)
516
517         r = handle.obj.readfrom(off, size, self.num_retries)
518         if r:
519             self.read_counter.add(len(r))
520         return r
521
522     @catch_exceptions
523     def write(self, fh, off, buf):
524         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
525         self.write_ops_counter.add(1)
526
527         if fh in self._filehandles:
528             handle = self._filehandles[fh]
529         else:
530             raise llfuse.FUSEError(errno.EBADF)
531
532         if not handle.obj.writable():
533             raise llfuse.FUSEError(errno.EPERM)
534
535         self.inodes.touch(handle.obj)
536
537         w = handle.obj.writeto(off, buf, self.num_retries)
538         if w:
539             self.write_counter.add(w)
540         return w
541
542     @catch_exceptions
543     def release(self, fh):
544         if fh in self._filehandles:
545             try:
546                 self._filehandles[fh].flush()
547             except Exception:
548                 raise
549             finally:
550                 self._filehandles[fh].release()
551                 del self._filehandles[fh]
552         self.inodes.inode_cache.cap_cache()
553
554     def releasedir(self, fh):
555         self.release(fh)
556
557     @catch_exceptions
558     def opendir(self, inode):
559         _logger.debug("arv-mount opendir: inode %i", inode)
560
561         if inode in self.inodes:
562             p = self.inodes[inode]
563         else:
564             raise llfuse.FUSEError(errno.ENOENT)
565
566         if not isinstance(p, Directory):
567             raise llfuse.FUSEError(errno.ENOTDIR)
568
569         fh = next(self._filehandles_counter)
570         if p.parent_inode in self.inodes:
571             parent = self.inodes[p.parent_inode]
572         else:
573             raise llfuse.FUSEError(errno.EIO)
574
575         # update atime
576         self.inodes.touch(p)
577
578         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
579         return fh
580
581     @catch_exceptions
582     def readdir(self, fh, off):
583         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
584
585         if fh in self._filehandles:
586             handle = self._filehandles[fh]
587         else:
588             raise llfuse.FUSEError(errno.EBADF)
589
590         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
591
592         e = off
593         while e < len(handle.entries):
594             if handle.entries[e][1].inode in self.inodes:
595                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
596             e += 1
597
598     @catch_exceptions
599     def statfs(self):
600         st = llfuse.StatvfsData()
601         st.f_bsize = 128 * 1024
602         st.f_blocks = 0
603         st.f_files = 0
604
605         st.f_bfree = 0
606         st.f_bavail = 0
607
608         st.f_ffree = 0
609         st.f_favail = 0
610
611         st.f_frsize = 0
612         return st
613
614     def _check_writable(self, inode_parent):
615         if not self.enable_write:
616             raise llfuse.FUSEError(errno.EROFS)
617
618         if inode_parent in self.inodes:
619             p = self.inodes[inode_parent]
620         else:
621             raise llfuse.FUSEError(errno.ENOENT)
622
623         if not isinstance(p, Directory):
624             raise llfuse.FUSEError(errno.ENOTDIR)
625
626         if not p.writable():
627             raise llfuse.FUSEError(errno.EPERM)
628
629         return p
630
631     @catch_exceptions
632     def create(self, inode_parent, name, mode, flags, ctx):
633         _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
634
635         p = self._check_writable(inode_parent)
636         p.create(name)
637
638         # The file entry should have been implicitly created by callback.
639         f = p[name]
640         fh = next(self._filehandles_counter)
641         self._filehandles[fh] = FileHandle(fh, f)
642         self.inodes.touch(p)
643
644         f.inc_ref()
645         return (fh, self.getattr(f.inode))
646
647     @catch_exceptions
648     def mkdir(self, inode_parent, name, mode, ctx):
649         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
650
651         p = self._check_writable(inode_parent)
652         p.mkdir(name)
653
654         # The dir entry should have been implicitly created by callback.
655         d = p[name]
656
657         d.inc_ref()
658         return self.getattr(d.inode)
659
660     @catch_exceptions
661     def unlink(self, inode_parent, name):
662         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
663         p = self._check_writable(inode_parent)
664         p.unlink(name)
665
666     @catch_exceptions
667     def rmdir(self, inode_parent, name):
668         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
669         p = self._check_writable(inode_parent)
670         p.rmdir(name)
671
672     @catch_exceptions
673     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
674         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
675         src = self._check_writable(inode_parent_old)
676         dest = self._check_writable(inode_parent_new)
677         dest.rename(name_old, name_new, src)
678
679     @catch_exceptions
680     def flush(self, fh):
681         if fh in self._filehandles:
682             self._filehandles[fh].flush()
683
684     def fsync(self, fh, datasync):
685         self.flush(fh)
686
687     def fsyncdir(self, fh, datasync):
688         self.flush(fh)