Merge branch '6643-fuse-del-crash' closes #6643
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69
70 import Queue
71
72 # Default _notify_queue has a limit of 1000 items, but it really needs to be
73 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
74 # details.
75
76 llfuse.capi._notify_queue = Queue.Queue()
77
78 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
79 from fusefile import StringFile, FuseArvadosFile
80
81 _logger = logging.getLogger('arvados.arvados_fuse')
82
83 # Uncomment this to enable llfuse debug logging.
84 # log_handler = logging.StreamHandler()
85 # llogger = logging.getLogger('llfuse')
86 # llogger.addHandler(log_handler)
87 # llogger.setLevel(logging.DEBUG)
88
89 class Handle(object):
90     """Connects a numeric file handle to a File or Directory object that has
91     been opened by the client."""
92
93     def __init__(self, fh, obj):
94         self.fh = fh
95         self.obj = obj
96         self.obj.inc_use()
97
98     def release(self):
99         self.obj.dec_use()
100
101     def flush(self):
102         if self.obj.writable():
103             return self.obj.flush()
104
105
106 class FileHandle(Handle):
107     """Connects a numeric file handle to a File  object that has
108     been opened by the client."""
109     pass
110
111
112 class DirectoryHandle(Handle):
113     """Connects a numeric file handle to a Directory object that has
114     been opened by the client."""
115
116     def __init__(self, fh, dirobj, entries):
117         super(DirectoryHandle, self).__init__(fh, dirobj)
118         self.entries = entries
119
120
121 class InodeCache(object):
122     """Records the memory footprint of objects and when they are last used.
123
124     When the cache limit is exceeded, the least recently used objects are
125     cleared.  Clearing the object means discarding its contents to release
126     memory.  The next time the object is accessed, it must be re-fetched from
127     the server.  Note that the inode cache limit is a soft limit; the cache
128     limit may be exceeded if necessary to load very large objects, it may also
129     be exceeded if open file handles prevent objects from being cleared.
130
131     """
132
133     def __init__(self, cap, min_entries=4):
134         self._entries = collections.OrderedDict()
135         self._by_uuid = {}
136         self._counter = itertools.count(0)
137         self.cap = cap
138         self._total = 0
139         self.min_entries = min_entries
140
141     def total(self):
142         return self._total
143
144     def _remove(self, obj, clear):
145         if clear and not obj.clear():
146             _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
147             return False
148         self._total -= obj.cache_size
149         del self._entries[obj.cache_priority]
150         if obj.cache_uuid:
151             self._by_uuid[obj.cache_uuid].remove(obj)
152             if not self._by_uuid[obj.cache_uuid]:
153                 del self._by_uuid[obj.cache_uuid]
154             obj.cache_uuid = None
155         if clear:
156             _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
157         return True
158
159     def cap_cache(self):
160         if self._total > self.cap:
161             for key in list(self._entries.keys()):
162                 if self._total < self.cap or len(self._entries) < self.min_entries:
163                     break
164                 self._remove(self._entries[key], True)
165
166     def manage(self, obj):
167         if obj.persisted():
168             obj.cache_priority = next(self._counter)
169             obj.cache_size = obj.objsize()
170             self._entries[obj.cache_priority] = obj
171             obj.cache_uuid = obj.uuid()
172             if obj.cache_uuid:
173                 if obj.cache_uuid not in self._by_uuid:
174                     self._by_uuid[obj.cache_uuid] = [obj]
175                 else:
176                     if obj not in self._by_uuid[obj.cache_uuid]:
177                         self._by_uuid[obj.cache_uuid].append(obj)
178             self._total += obj.objsize()
179             _logger.debug("InodeCache touched %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
180             self.cap_cache()
181         else:
182             obj.cache_priority = None
183
184     def touch(self, obj):
185         if obj.persisted():
186             if obj.cache_priority in self._entries:
187                 self._remove(obj, False)
188             self.manage(obj)
189
190     def unmanage(self, obj):
191         if obj.persisted() and obj.cache_priority in self._entries:
192             self._remove(obj, True)
193
194     def find(self, uuid):
195         return self._by_uuid.get(uuid)
196
197     def clear(self):
198         self._entries.clear()
199         self._by_uuid.clear()
200         self._total = 0
201
202 class Inodes(object):
203     """Manage the set of inodes.  This is the mapping from a numeric id
204     to a concrete File or Directory object"""
205
206     def __init__(self, inode_cache, encoding="utf-8"):
207         self._entries = {}
208         self._counter = itertools.count(llfuse.ROOT_INODE)
209         self.inode_cache = inode_cache
210         self.encoding = encoding
211         self.deferred_invalidations = []
212
213     def __getitem__(self, item):
214         return self._entries[item]
215
216     def __setitem__(self, key, item):
217         self._entries[key] = item
218
219     def __iter__(self):
220         return self._entries.iterkeys()
221
222     def items(self):
223         return self._entries.items()
224
225     def __contains__(self, k):
226         return k in self._entries
227
228     def touch(self, entry):
229         entry._atime = time.time()
230         self.inode_cache.touch(entry)
231
232     def add_entry(self, entry):
233         entry.inode = next(self._counter)
234         if entry.inode == llfuse.ROOT_INODE:
235             entry.inc_ref()
236         self._entries[entry.inode] = entry
237         self.inode_cache.manage(entry)
238         return entry
239
240     def del_entry(self, entry):
241         if entry.ref_count == 0:
242             self.inode_cache.unmanage(entry)
243             del self._entries[entry.inode]
244             with llfuse.lock_released:
245                 entry.finalize()
246             self.invalidate_inode(entry.inode)
247             entry.inode = None
248         else:
249             entry.dead = True
250             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
251
252     def invalidate_inode(self, inode):
253         llfuse.invalidate_inode(inode)
254
255     def invalidate_entry(self, inode, name):
256         llfuse.invalidate_entry(inode, name)
257
258     def clear(self):
259         self.inode_cache.clear()
260
261         for k,v in self._entries.items():
262             try:
263                 v.finalize()
264             except Exception as e:
265                 _logger.exception("Error during finalize of inode %i", k)
266
267         self._entries.clear()
268
269
270 def catch_exceptions(orig_func):
271     """Catch uncaught exceptions and log them consistently."""
272
273     @functools.wraps(orig_func)
274     def catch_exceptions_wrapper(self, *args, **kwargs):
275         try:
276             return orig_func(self, *args, **kwargs)
277         except llfuse.FUSEError:
278             raise
279         except EnvironmentError as e:
280             raise llfuse.FUSEError(e.errno)
281         except arvados.errors.KeepWriteError as e:
282             _logger.error("Keep write error: " + str(e))
283             raise llfuse.FUSEError(errno.EIO)
284         except arvados.errors.NotFoundError as e:
285             _logger.error("Block not found error: " + str(e))
286             raise llfuse.FUSEError(errno.EIO)
287         except:
288             _logger.exception("Unhandled exception during FUSE operation")
289             raise llfuse.FUSEError(errno.EIO)
290
291     return catch_exceptions_wrapper
292
293
294 class Operations(llfuse.Operations):
295     """This is the main interface with llfuse.
296
297     The methods on this object are called by llfuse threads to service FUSE
298     events to query and read from the file system.
299
300     llfuse has its own global lock which is acquired before calling a request handler,
301     so request handlers do not run concurrently unless the lock is explicitly released
302     using 'with llfuse.lock_released:'
303
304     """
305
306     def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
307         super(Operations, self).__init__()
308
309         if not inode_cache:
310             inode_cache = InodeCache(cap=256*1024*1024)
311         self.inodes = Inodes(inode_cache, encoding=encoding)
312         self.uid = uid
313         self.gid = gid
314         self.enable_write = enable_write
315
316         # dict of inode to filehandle
317         self._filehandles = {}
318         self._filehandles_counter = itertools.count(0)
319
320         # Other threads that need to wait until the fuse driver
321         # is fully initialized should wait() on this event object.
322         self.initlock = threading.Event()
323
324         self.num_retries = num_retries
325
326         self.events = None
327
328     def init(self):
329         # Allow threads that are waiting for the driver to be finished
330         # initializing to continue
331         self.initlock.set()
332
333     @catch_exceptions
334     def destroy(self):
335         if self.events:
336             self.events.close()
337             self.events = None
338
339         self.inodes.clear()
340
341     def access(self, inode, mode, ctx):
342         return True
343
344     def listen_for_events(self, api_client):
345         self.events = arvados.events.subscribe(api_client,
346                                  [["event_type", "in", ["create", "update", "delete"]]],
347                                  self.on_event)
348
349     @catch_exceptions
350     def on_event(self, ev):
351         if 'event_type' in ev:
352             with llfuse.lock:
353                 item = self.inodes.inode_cache.find(ev["object_uuid"])
354                 if item is not None:
355                     item.invalidate()
356                     if ev["object_kind"] == "arvados#collection":
357                         new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
358
359                         # new_attributes.modified_at currently lacks subsecond precision (see #6347) so use event_at which
360                         # should always be the same.
361                         #record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
362                         record_version = (ev["event_at"], new_attr["portable_data_hash"]) if new_attr else None
363
364                         item.update(to_record_version=record_version)
365                     else:
366                         item.update()
367
368                 oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
369                 olditemparent = self.inodes.inode_cache.find(oldowner)
370                 if olditemparent is not None:
371                     olditemparent.invalidate()
372                     olditemparent.update()
373
374                 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
375                 if itemparent is not None:
376                     itemparent.invalidate()
377                     itemparent.update()
378
379
380     @catch_exceptions
381     def getattr(self, inode):
382         if inode not in self.inodes:
383             raise llfuse.FUSEError(errno.ENOENT)
384
385         e = self.inodes[inode]
386
387         entry = llfuse.EntryAttributes()
388         entry.st_ino = inode
389         entry.generation = 0
390         entry.entry_timeout = 60
391         entry.attr_timeout = 60
392
393         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
394         if isinstance(e, Directory):
395             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
396         else:
397             entry.st_mode |= stat.S_IFREG
398             if isinstance(e, FuseArvadosFile):
399                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
400
401         if self.enable_write and e.writable():
402             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
403
404         entry.st_nlink = 1
405         entry.st_uid = self.uid
406         entry.st_gid = self.gid
407         entry.st_rdev = 0
408
409         entry.st_size = e.size()
410
411         entry.st_blksize = 512
412         entry.st_blocks = (entry.st_size/512)+1
413         entry.st_atime = int(e.atime())
414         entry.st_mtime = int(e.mtime())
415         entry.st_ctime = int(e.mtime())
416
417         return entry
418
419     @catch_exceptions
420     def setattr(self, inode, attr):
421         entry = self.getattr(inode)
422
423         e = self.inodes[inode]
424
425         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
426             with llfuse.lock_released:
427                 e.arvfile.truncate(attr.st_size)
428                 entry.st_size = e.arvfile.size()
429
430         return entry
431
432     @catch_exceptions
433     def lookup(self, parent_inode, name):
434         name = unicode(name, self.inodes.encoding)
435         inode = None
436
437         if name == '.':
438             inode = parent_inode
439         else:
440             if parent_inode in self.inodes:
441                 p = self.inodes[parent_inode]
442                 if name == '..':
443                     inode = p.parent_inode
444                 elif isinstance(p, Directory) and name in p:
445                     inode = p[name].inode
446
447         if inode != None:
448             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
449                       parent_inode, name, inode)
450             self.inodes[inode].inc_ref()
451             return self.getattr(inode)
452         else:
453             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
454                       parent_inode, name)
455             raise llfuse.FUSEError(errno.ENOENT)
456
457     @catch_exceptions
458     def forget(self, inodes):
459         for inode, nlookup in inodes:
460             ent = self.inodes[inode]
461             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
462             if ent.dec_ref(nlookup) == 0 and ent.dead:
463                 self.inodes.del_entry(ent)
464
465     @catch_exceptions
466     def open(self, inode, flags):
467         if inode in self.inodes:
468             p = self.inodes[inode]
469         else:
470             raise llfuse.FUSEError(errno.ENOENT)
471
472         if isinstance(p, Directory):
473             raise llfuse.FUSEError(errno.EISDIR)
474
475         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
476             raise llfuse.FUSEError(errno.EPERM)
477
478         fh = next(self._filehandles_counter)
479         self._filehandles[fh] = FileHandle(fh, p)
480         self.inodes.touch(p)
481         return fh
482
483     @catch_exceptions
484     def read(self, fh, off, size):
485         _logger.debug("arv-mount read %i %i %i", fh, off, size)
486         if fh in self._filehandles:
487             handle = self._filehandles[fh]
488         else:
489             raise llfuse.FUSEError(errno.EBADF)
490
491         self.inodes.touch(handle.obj)
492
493         return handle.obj.readfrom(off, size, self.num_retries)
494
495     @catch_exceptions
496     def write(self, fh, off, buf):
497         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
498         if fh in self._filehandles:
499             handle = self._filehandles[fh]
500         else:
501             raise llfuse.FUSEError(errno.EBADF)
502
503         if not handle.obj.writable():
504             raise llfuse.FUSEError(errno.EPERM)
505
506         self.inodes.touch(handle.obj)
507
508         return handle.obj.writeto(off, buf, self.num_retries)
509
510     @catch_exceptions
511     def release(self, fh):
512         if fh in self._filehandles:
513             try:
514                 self._filehandles[fh].flush()
515             except Exception:
516                 raise
517             finally:
518                 self._filehandles[fh].release()
519                 del self._filehandles[fh]
520         self.inodes.inode_cache.cap_cache()
521
522     def releasedir(self, fh):
523         self.release(fh)
524
525     @catch_exceptions
526     def opendir(self, inode):
527         _logger.debug("arv-mount opendir: inode %i", inode)
528
529         if inode in self.inodes:
530             p = self.inodes[inode]
531         else:
532             raise llfuse.FUSEError(errno.ENOENT)
533
534         if not isinstance(p, Directory):
535             raise llfuse.FUSEError(errno.ENOTDIR)
536
537         fh = next(self._filehandles_counter)
538         if p.parent_inode in self.inodes:
539             parent = self.inodes[p.parent_inode]
540         else:
541             raise llfuse.FUSEError(errno.EIO)
542
543         # update atime
544         self.inodes.touch(p)
545
546         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
547         return fh
548
549     @catch_exceptions
550     def readdir(self, fh, off):
551         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
552
553         if fh in self._filehandles:
554             handle = self._filehandles[fh]
555         else:
556             raise llfuse.FUSEError(errno.EBADF)
557
558         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
559
560         e = off
561         while e < len(handle.entries):
562             if handle.entries[e][1].inode in self.inodes:
563                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
564             e += 1
565
566     @catch_exceptions
567     def statfs(self):
568         st = llfuse.StatvfsData()
569         st.f_bsize = 128 * 1024
570         st.f_blocks = 0
571         st.f_files = 0
572
573         st.f_bfree = 0
574         st.f_bavail = 0
575
576         st.f_ffree = 0
577         st.f_favail = 0
578
579         st.f_frsize = 0
580         return st
581
582     def _check_writable(self, inode_parent):
583         if not self.enable_write:
584             raise llfuse.FUSEError(errno.EROFS)
585
586         if inode_parent in self.inodes:
587             p = self.inodes[inode_parent]
588         else:
589             raise llfuse.FUSEError(errno.ENOENT)
590
591         if not isinstance(p, Directory):
592             raise llfuse.FUSEError(errno.ENOTDIR)
593
594         if not p.writable():
595             raise llfuse.FUSEError(errno.EPERM)
596
597         return p
598
599     @catch_exceptions
600     def create(self, inode_parent, name, mode, flags, ctx):
601         _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
602
603         p = self._check_writable(inode_parent)
604         p.create(name)
605
606         # The file entry should have been implicitly created by callback.
607         f = p[name]
608         fh = next(self._filehandles_counter)
609         self._filehandles[fh] = FileHandle(fh, f)
610         self.inodes.touch(p)
611
612         f.inc_ref()
613         return (fh, self.getattr(f.inode))
614
615     @catch_exceptions
616     def mkdir(self, inode_parent, name, mode, ctx):
617         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
618
619         p = self._check_writable(inode_parent)
620         p.mkdir(name)
621
622         # The dir entry should have been implicitly created by callback.
623         d = p[name]
624
625         d.inc_ref()
626         return self.getattr(d.inode)
627
628     @catch_exceptions
629     def unlink(self, inode_parent, name):
630         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
631         p = self._check_writable(inode_parent)
632         p.unlink(name)
633
634     @catch_exceptions
635     def rmdir(self, inode_parent, name):
636         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
637         p = self._check_writable(inode_parent)
638         p.rmdir(name)
639
640     @catch_exceptions
641     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
642         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
643         src = self._check_writable(inode_parent_old)
644         dest = self._check_writable(inode_parent_new)
645         dest.rename(name_old, name_new, src)
646
647     @catch_exceptions
648     def flush(self, fh):
649         if fh in self._filehandles:
650             self._filehandles[fh].flush()
651
652     def fsync(self, fh, datasync):
653         self.flush(fh)
654
655     def fsyncdir(self, fh, datasync):
656         self.flush(fh)