8099: 7263: Merge branch 'hgi/7263-even-better-busy-behavior' of github.com:wtsi...
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69 import arvados.keep
70
71 import Queue
72
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
75 # details.
76
77 llfuse.capi._notify_queue = Queue.Queue()
78
79 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
80 from fusefile import StringFile, FuseArvadosFile
81
82 _logger = logging.getLogger('arvados.arvados_fuse')
83
84 # Uncomment this to enable llfuse debug logging.
85 # log_handler = logging.StreamHandler()
86 # llogger = logging.getLogger('llfuse')
87 # llogger.addHandler(log_handler)
88 # llogger.setLevel(logging.DEBUG)
89
90 class Handle(object):
91     """Connects a numeric file handle to a File or Directory object that has
92     been opened by the client."""
93
94     def __init__(self, fh, obj):
95         self.fh = fh
96         self.obj = obj
97         self.obj.inc_use()
98
99     def release(self):
100         self.obj.dec_use()
101
102     def flush(self):
103         if self.obj.writable():
104             return self.obj.flush()
105
106
107 class FileHandle(Handle):
108     """Connects a numeric file handle to a File  object that has
109     been opened by the client."""
110     pass
111
112
113 class DirectoryHandle(Handle):
114     """Connects a numeric file handle to a Directory object that has
115     been opened by the client."""
116
117     def __init__(self, fh, dirobj, entries):
118         super(DirectoryHandle, self).__init__(fh, dirobj)
119         self.entries = entries
120
121
122 class InodeCache(object):
123     """Records the memory footprint of objects and when they are last used.
124
125     When the cache limit is exceeded, the least recently used objects are
126     cleared.  Clearing the object means discarding its contents to release
127     memory.  The next time the object is accessed, it must be re-fetched from
128     the server.  Note that the inode cache limit is a soft limit; the cache
129     limit may be exceeded if necessary to load very large objects, it may also
130     be exceeded if open file handles prevent objects from being cleared.
131
132     """
133
134     def __init__(self, cap, min_entries=4):
135         self._entries = collections.OrderedDict()
136         self._by_uuid = {}
137         self._counter = itertools.count(0)
138         self.cap = cap
139         self._total = 0
140         self.min_entries = min_entries
141
142     def total(self):
143         return self._total
144
145     def _remove(self, obj, clear):
146         if clear and not obj.clear():
147             _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
148             return False
149         self._total -= obj.cache_size
150         del self._entries[obj.cache_priority]
151         if obj.cache_uuid:
152             self._by_uuid[obj.cache_uuid].remove(obj)
153             if not self._by_uuid[obj.cache_uuid]:
154                 del self._by_uuid[obj.cache_uuid]
155             obj.cache_uuid = None
156         if clear:
157             _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
158         return True
159
160     def cap_cache(self):
161         if self._total > self.cap:
162             for key in list(self._entries.keys()):
163                 if self._total < self.cap or len(self._entries) < self.min_entries:
164                     break
165                 self._remove(self._entries[key], True)
166
167     def manage(self, obj):
168         if obj.persisted():
169             obj.cache_priority = next(self._counter)
170             obj.cache_size = obj.objsize()
171             self._entries[obj.cache_priority] = obj
172             obj.cache_uuid = obj.uuid()
173             if obj.cache_uuid:
174                 if obj.cache_uuid not in self._by_uuid:
175                     self._by_uuid[obj.cache_uuid] = [obj]
176                 else:
177                     if obj not in self._by_uuid[obj.cache_uuid]:
178                         self._by_uuid[obj.cache_uuid].append(obj)
179             self._total += obj.objsize()
180             _logger.debug("InodeCache touched %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
181             self.cap_cache()
182         else:
183             obj.cache_priority = None
184
185     def touch(self, obj):
186         if obj.persisted():
187             if obj.cache_priority in self._entries:
188                 self._remove(obj, False)
189             self.manage(obj)
190
191     def unmanage(self, obj):
192         if obj.persisted() and obj.cache_priority in self._entries:
193             self._remove(obj, True)
194
195     def find_by_uuid(self, uuid):
196         return self._by_uuid.get(uuid, [])
197
198     def clear(self):
199         self._entries.clear()
200         self._by_uuid.clear()
201         self._total = 0
202
203 class Inodes(object):
204     """Manage the set of inodes.  This is the mapping from a numeric id
205     to a concrete File or Directory object"""
206
207     def __init__(self, inode_cache, encoding="utf-8"):
208         self._entries = {}
209         self._counter = itertools.count(llfuse.ROOT_INODE)
210         self.inode_cache = inode_cache
211         self.encoding = encoding
212         self.deferred_invalidations = []
213
214     def __getitem__(self, item):
215         return self._entries[item]
216
217     def __setitem__(self, key, item):
218         self._entries[key] = item
219
220     def __iter__(self):
221         return self._entries.iterkeys()
222
223     def items(self):
224         return self._entries.items()
225
226     def __contains__(self, k):
227         return k in self._entries
228
229     def touch(self, entry):
230         entry._atime = time.time()
231         self.inode_cache.touch(entry)
232
233     def add_entry(self, entry):
234         entry.inode = next(self._counter)
235         if entry.inode == llfuse.ROOT_INODE:
236             entry.inc_ref()
237         self._entries[entry.inode] = entry
238         self.inode_cache.manage(entry)
239         return entry
240
241     def del_entry(self, entry):
242         if entry.ref_count == 0:
243             self.inode_cache.unmanage(entry)
244             del self._entries[entry.inode]
245             with llfuse.lock_released:
246                 entry.finalize()
247             self.invalidate_inode(entry.inode)
248             entry.inode = None
249         else:
250             entry.dead = True
251             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
252
253     def invalidate_inode(self, inode):
254         llfuse.invalidate_inode(inode)
255
256     def invalidate_entry(self, inode, name):
257         llfuse.invalidate_entry(inode, name.encode(self.encoding))
258
259     def clear(self):
260         self.inode_cache.clear()
261
262         for k,v in self._entries.items():
263             try:
264                 v.finalize()
265             except Exception as e:
266                 _logger.exception("Error during finalize of inode %i", k)
267
268         self._entries.clear()
269
270
271 def catch_exceptions(orig_func):
272     """Catch uncaught exceptions and log them consistently."""
273
274     @functools.wraps(orig_func)
275     def catch_exceptions_wrapper(self, *args, **kwargs):
276         try:
277             return orig_func(self, *args, **kwargs)
278         except llfuse.FUSEError:
279             raise
280         except EnvironmentError as e:
281             raise llfuse.FUSEError(e.errno)
282         except arvados.errors.KeepWriteError as e:
283             _logger.error("Keep write error: " + str(e))
284             raise llfuse.FUSEError(errno.EIO)
285         except arvados.errors.NotFoundError as e:
286             _logger.error("Block not found error: " + str(e))
287             raise llfuse.FUSEError(errno.EIO)
288         except:
289             _logger.exception("Unhandled exception during FUSE operation")
290             raise llfuse.FUSEError(errno.EIO)
291
292     return catch_exceptions_wrapper
293
294
295 class Operations(llfuse.Operations):
296     """This is the main interface with llfuse.
297
298     The methods on this object are called by llfuse threads to service FUSE
299     events to query and read from the file system.
300
301     llfuse has its own global lock which is acquired before calling a request handler,
302     so request handlers do not run concurrently unless the lock is explicitly released
303     using 'with llfuse.lock_released:'
304
305     """
306
307     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
308         super(Operations, self).__init__()
309
310         self._api_client = api_client
311
312         if not inode_cache:
313             inode_cache = InodeCache(cap=256*1024*1024)
314         self.inodes = Inodes(inode_cache, encoding=encoding)
315         self.uid = uid
316         self.gid = gid
317         self.enable_write = enable_write
318
319         # dict of inode to filehandle
320         self._filehandles = {}
321         self._filehandles_counter = itertools.count(0)
322
323         # Other threads that need to wait until the fuse driver
324         # is fully initialized should wait() on this event object.
325         self.initlock = threading.Event()
326
327         # If we get overlapping shutdown events (e.g., fusermount -u
328         # -z and operations.destroy()) llfuse calls forget() on inodes
329         # that have already been deleted. To avoid this, we make
330         # forget() a no-op if called after destroy().
331         self._shutdown_started = threading.Event()
332
333         self.num_retries = num_retries
334
335         self.read_counter = arvados.keep.Counter()
336         self.write_counter = arvados.keep.Counter()
337         self.read_ops_counter = arvados.keep.Counter()
338         self.write_ops_counter = arvados.keep.Counter()
339
340         self.events = None
341
342     def init(self):
343         # Allow threads that are waiting for the driver to be finished
344         # initializing to continue
345         self.initlock.set()
346
347     @catch_exceptions
348     def destroy(self):
349         with llfuse.lock:
350             self._shutdown_started.set()
351             if self.events:
352                 self.events.close()
353                 self.events = None
354
355             self.inodes.clear()
356
357     def access(self, inode, mode, ctx):
358         return True
359
360     def listen_for_events(self):
361         self.events = arvados.events.subscribe(self._api_client,
362                                  [["event_type", "in", ["create", "update", "delete"]]],
363                                  self.on_event)
364
365     @catch_exceptions
366     def on_event(self, ev):
367         if 'event_type' not in ev:
368             return
369         with llfuse.lock:
370             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
371                 item.invalidate()
372                 if ev["object_kind"] == "arvados#collection":
373                     new_attr = (ev.get("properties") and
374                                 ev["properties"].get("new_attributes") and
375                                 ev["properties"]["new_attributes"])
376
377                     # new_attributes.modified_at currently lacks
378                     # subsecond precision (see #6347) so use event_at
379                     # which should always be the same.
380                     record_version = (
381                         (ev["event_at"], new_attr["portable_data_hash"])
382                         if new_attr else None)
383
384                     item.update(to_record_version=record_version)
385                 else:
386                     item.update()
387
388             oldowner = (
389                 ev.get("properties") and
390                 ev["properties"].get("old_attributes") and
391                 ev["properties"]["old_attributes"].get("owner_uuid"))
392             newowner = ev["object_owner_uuid"]
393             for parent in (
394                     self.inodes.inode_cache.find_by_uuid(oldowner) +
395                     self.inodes.inode_cache.find_by_uuid(newowner)):
396                 parent.invalidate()
397                 parent.update()
398
399
400     @catch_exceptions
401     def getattr(self, inode):
402         if inode not in self.inodes:
403             raise llfuse.FUSEError(errno.ENOENT)
404
405         e = self.inodes[inode]
406
407         entry = llfuse.EntryAttributes()
408         entry.st_ino = inode
409         entry.generation = 0
410         entry.entry_timeout = 60 if e.allow_dirent_cache else 0
411         entry.attr_timeout = 60 if e.allow_attr_cache else 0
412
413         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
414         if isinstance(e, Directory):
415             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
416         else:
417             entry.st_mode |= stat.S_IFREG
418             if isinstance(e, FuseArvadosFile):
419                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
420
421         if self.enable_write and e.writable():
422             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
423
424         entry.st_nlink = 1
425         entry.st_uid = self.uid
426         entry.st_gid = self.gid
427         entry.st_rdev = 0
428
429         entry.st_size = e.size()
430
431         entry.st_blksize = 512
432         entry.st_blocks = (entry.st_size/512)+1
433         entry.st_atime = int(e.atime())
434         entry.st_mtime = int(e.mtime())
435         entry.st_ctime = int(e.mtime())
436
437         return entry
438
439     @catch_exceptions
440     def setattr(self, inode, attr):
441         entry = self.getattr(inode)
442
443         e = self.inodes[inode]
444
445         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
446             with llfuse.lock_released:
447                 e.arvfile.truncate(attr.st_size)
448                 entry.st_size = e.arvfile.size()
449
450         return entry
451
452     @catch_exceptions
453     def lookup(self, parent_inode, name):
454         name = unicode(name, self.inodes.encoding)
455         inode = None
456
457         if name == '.':
458             inode = parent_inode
459         else:
460             if parent_inode in self.inodes:
461                 p = self.inodes[parent_inode]
462                 self.inodes.touch(p)
463                 if name == '..':
464                     inode = p.parent_inode
465                 elif isinstance(p, Directory) and name in p:
466                     inode = p[name].inode
467
468         if inode != None:
469             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
470                       parent_inode, name, inode)
471             self.inodes[inode].inc_ref()
472             return self.getattr(inode)
473         else:
474             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
475                       parent_inode, name)
476             raise llfuse.FUSEError(errno.ENOENT)
477
478     @catch_exceptions
479     def forget(self, inodes):
480         if self._shutdown_started.is_set():
481             return
482         for inode, nlookup in inodes:
483             ent = self.inodes[inode]
484             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
485             if ent.dec_ref(nlookup) == 0 and ent.dead:
486                 self.inodes.del_entry(ent)
487
488     @catch_exceptions
489     def open(self, inode, flags):
490         if inode in self.inodes:
491             p = self.inodes[inode]
492         else:
493             raise llfuse.FUSEError(errno.ENOENT)
494
495         if isinstance(p, Directory):
496             raise llfuse.FUSEError(errno.EISDIR)
497
498         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
499             raise llfuse.FUSEError(errno.EPERM)
500
501         fh = next(self._filehandles_counter)
502         self._filehandles[fh] = FileHandle(fh, p)
503         self.inodes.touch(p)
504
505         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
506
507         return fh
508
509     @catch_exceptions
510     def read(self, fh, off, size):
511         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
512         self.read_ops_counter.add(1)
513
514         if fh in self._filehandles:
515             handle = self._filehandles[fh]
516         else:
517             raise llfuse.FUSEError(errno.EBADF)
518
519         self.inodes.touch(handle.obj)
520
521         r = handle.obj.readfrom(off, size, self.num_retries)
522         if r:
523             self.read_counter.add(len(r))
524         return r
525
526     @catch_exceptions
527     def write(self, fh, off, buf):
528         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
529         self.write_ops_counter.add(1)
530
531         if fh in self._filehandles:
532             handle = self._filehandles[fh]
533         else:
534             raise llfuse.FUSEError(errno.EBADF)
535
536         if not handle.obj.writable():
537             raise llfuse.FUSEError(errno.EPERM)
538
539         self.inodes.touch(handle.obj)
540
541         w = handle.obj.writeto(off, buf, self.num_retries)
542         if w:
543             self.write_counter.add(w)
544         return w
545
546     @catch_exceptions
547     def release(self, fh):
548         if fh in self._filehandles:
549             try:
550                 self._filehandles[fh].flush()
551             except Exception:
552                 raise
553             finally:
554                 self._filehandles[fh].release()
555                 del self._filehandles[fh]
556         self.inodes.inode_cache.cap_cache()
557
558     def releasedir(self, fh):
559         self.release(fh)
560
561     @catch_exceptions
562     def opendir(self, inode):
563         _logger.debug("arv-mount opendir: inode %i", inode)
564
565         if inode in self.inodes:
566             p = self.inodes[inode]
567         else:
568             raise llfuse.FUSEError(errno.ENOENT)
569
570         if not isinstance(p, Directory):
571             raise llfuse.FUSEError(errno.ENOTDIR)
572
573         fh = next(self._filehandles_counter)
574         if p.parent_inode in self.inodes:
575             parent = self.inodes[p.parent_inode]
576         else:
577             raise llfuse.FUSEError(errno.EIO)
578
579         # update atime
580         self.inodes.touch(p)
581
582         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
583         return fh
584
585     @catch_exceptions
586     def readdir(self, fh, off):
587         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
588
589         if fh in self._filehandles:
590             handle = self._filehandles[fh]
591         else:
592             raise llfuse.FUSEError(errno.EBADF)
593
594         e = off
595         while e < len(handle.entries):
596             if handle.entries[e][1].inode in self.inodes:
597                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
598             e += 1
599
600     @catch_exceptions
601     def statfs(self):
602         st = llfuse.StatvfsData()
603         st.f_bsize = 128 * 1024
604         st.f_blocks = 0
605         st.f_files = 0
606
607         st.f_bfree = 0
608         st.f_bavail = 0
609
610         st.f_ffree = 0
611         st.f_favail = 0
612
613         st.f_frsize = 0
614         return st
615
616     def _check_writable(self, inode_parent):
617         if not self.enable_write:
618             raise llfuse.FUSEError(errno.EROFS)
619
620         if inode_parent in self.inodes:
621             p = self.inodes[inode_parent]
622         else:
623             raise llfuse.FUSEError(errno.ENOENT)
624
625         if not isinstance(p, Directory):
626             raise llfuse.FUSEError(errno.ENOTDIR)
627
628         if not p.writable():
629             raise llfuse.FUSEError(errno.EPERM)
630
631         return p
632
633     @catch_exceptions
634     def create(self, inode_parent, name, mode, flags, ctx):
635         _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
636
637         p = self._check_writable(inode_parent)
638         p.create(name)
639
640         # The file entry should have been implicitly created by callback.
641         f = p[name]
642         fh = next(self._filehandles_counter)
643         self._filehandles[fh] = FileHandle(fh, f)
644         self.inodes.touch(p)
645
646         f.inc_ref()
647         return (fh, self.getattr(f.inode))
648
649     @catch_exceptions
650     def mkdir(self, inode_parent, name, mode, ctx):
651         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
652
653         p = self._check_writable(inode_parent)
654         p.mkdir(name)
655
656         # The dir entry should have been implicitly created by callback.
657         d = p[name]
658
659         d.inc_ref()
660         return self.getattr(d.inode)
661
662     @catch_exceptions
663     def unlink(self, inode_parent, name):
664         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
665         p = self._check_writable(inode_parent)
666         p.unlink(name)
667
668     @catch_exceptions
669     def rmdir(self, inode_parent, name):
670         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
671         p = self._check_writable(inode_parent)
672         p.rmdir(name)
673
674     @catch_exceptions
675     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
676         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
677         src = self._check_writable(inode_parent_old)
678         dest = self._check_writable(inode_parent_new)
679         dest.rename(name_old, name_new, src)
680
681     @catch_exceptions
682     def flush(self, fh):
683         if fh in self._filehandles:
684             self._filehandles[fh].flush()
685
686     def fsync(self, fh, datasync):
687         self.flush(fh)
688
689     def fsyncdir(self, fh, datasync):
690         self.flush(fh)