8460: Hide *websocket.Conn behind interface.
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69 import arvados.keep
70
71 import Queue
72
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
75 # details.
76
77 llfuse.capi._notify_queue = Queue.Queue()
78
79 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
80 from fusefile import StringFile, FuseArvadosFile
81
82 _logger = logging.getLogger('arvados.arvados_fuse')
83
84 # Uncomment this to enable llfuse debug logging.
85 # log_handler = logging.StreamHandler()
86 # llogger = logging.getLogger('llfuse')
87 # llogger.addHandler(log_handler)
88 # llogger.setLevel(logging.DEBUG)
89
90 class Handle(object):
91     """Connects a numeric file handle to a File or Directory object that has
92     been opened by the client."""
93
94     def __init__(self, fh, obj):
95         self.fh = fh
96         self.obj = obj
97         self.obj.inc_use()
98
99     def release(self):
100         self.obj.dec_use()
101
102     def flush(self):
103         if self.obj.writable():
104             return self.obj.flush()
105
106
107 class FileHandle(Handle):
108     """Connects a numeric file handle to a File  object that has
109     been opened by the client."""
110     pass
111
112
113 class DirectoryHandle(Handle):
114     """Connects a numeric file handle to a Directory object that has
115     been opened by the client."""
116
117     def __init__(self, fh, dirobj, entries):
118         super(DirectoryHandle, self).__init__(fh, dirobj)
119         self.entries = entries
120
121
122 class InodeCache(object):
123     """Records the memory footprint of objects and when they are last used.
124
125     When the cache limit is exceeded, the least recently used objects are
126     cleared.  Clearing the object means discarding its contents to release
127     memory.  The next time the object is accessed, it must be re-fetched from
128     the server.  Note that the inode cache limit is a soft limit; the cache
129     limit may be exceeded if necessary to load very large objects, it may also
130     be exceeded if open file handles prevent objects from being cleared.
131
132     """
133
134     def __init__(self, cap, min_entries=4):
135         self._entries = collections.OrderedDict()
136         self._by_uuid = {}
137         self.cap = cap
138         self._total = 0
139         self.min_entries = min_entries
140
141     def total(self):
142         return self._total
143
144     def _remove(self, obj, clear):
145         if clear:
146             if obj.in_use():
147                 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
148                 return
149             if obj.has_ref(True):
150                 obj.kernel_invalidate()
151                 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
152                 return
153             obj.clear()
154
155         # The llfuse lock is released in del_entry(), which is called by
156         # Directory.clear().  While the llfuse lock is released, it can happen
157         # that a reentrant call removes this entry before this call gets to it.
158         # Ensure that the entry is still valid before trying to remove it.
159         if obj.inode not in self._entries:
160             return
161
162         self._total -= obj.cache_size
163         del self._entries[obj.inode]
164         if obj.cache_uuid:
165             self._by_uuid[obj.cache_uuid].remove(obj)
166             if not self._by_uuid[obj.cache_uuid]:
167                 del self._by_uuid[obj.cache_uuid]
168             obj.cache_uuid = None
169         if clear:
170             _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
171
172     def cap_cache(self):
173         if self._total > self.cap:
174             for ent in self._entries.values():
175                 if self._total < self.cap or len(self._entries) < self.min_entries:
176                     break
177                 self._remove(ent, True)
178
179     def manage(self, obj):
180         if obj.persisted():
181             obj.cache_size = obj.objsize()
182             self._entries[obj.inode] = obj
183             obj.cache_uuid = obj.uuid()
184             if obj.cache_uuid:
185                 if obj.cache_uuid not in self._by_uuid:
186                     self._by_uuid[obj.cache_uuid] = [obj]
187                 else:
188                     if obj not in self._by_uuid[obj.cache_uuid]:
189                         self._by_uuid[obj.cache_uuid].append(obj)
190             self._total += obj.objsize()
191             _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
192             self.cap_cache()
193
194     def touch(self, obj):
195         if obj.persisted():
196             if obj.inode in self._entries:
197                 self._remove(obj, False)
198             self.manage(obj)
199
200     def unmanage(self, obj):
201         if obj.persisted() and obj.inode in self._entries:
202             self._remove(obj, True)
203
204     def find_by_uuid(self, uuid):
205         return self._by_uuid.get(uuid, [])
206
207     def clear(self):
208         self._entries.clear()
209         self._by_uuid.clear()
210         self._total = 0
211
212 class Inodes(object):
213     """Manage the set of inodes.  This is the mapping from a numeric id
214     to a concrete File or Directory object"""
215
216     def __init__(self, inode_cache, encoding="utf-8"):
217         self._entries = {}
218         self._counter = itertools.count(llfuse.ROOT_INODE)
219         self.inode_cache = inode_cache
220         self.encoding = encoding
221         self.deferred_invalidations = []
222
223     def __getitem__(self, item):
224         return self._entries[item]
225
226     def __setitem__(self, key, item):
227         self._entries[key] = item
228
229     def __iter__(self):
230         return self._entries.iterkeys()
231
232     def items(self):
233         return self._entries.items()
234
235     def __contains__(self, k):
236         return k in self._entries
237
238     def touch(self, entry):
239         entry._atime = time.time()
240         self.inode_cache.touch(entry)
241
242     def add_entry(self, entry):
243         entry.inode = next(self._counter)
244         if entry.inode == llfuse.ROOT_INODE:
245             entry.inc_ref()
246         self._entries[entry.inode] = entry
247         self.inode_cache.manage(entry)
248         return entry
249
250     def del_entry(self, entry):
251         if entry.ref_count == 0:
252             self.inode_cache.unmanage(entry)
253             del self._entries[entry.inode]
254             with llfuse.lock_released:
255                 entry.finalize()
256             self.invalidate_inode(entry.inode)
257             entry.inode = None
258         else:
259             entry.dead = True
260             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
261
262     def invalidate_inode(self, inode):
263         llfuse.invalidate_inode(inode)
264
265     def invalidate_entry(self, inode, name):
266         llfuse.invalidate_entry(inode, name.encode(self.encoding))
267
268     def clear(self):
269         self.inode_cache.clear()
270
271         for k,v in self._entries.items():
272             try:
273                 v.finalize()
274             except Exception as e:
275                 _logger.exception("Error during finalize of inode %i", k)
276
277         self._entries.clear()
278
279
280 def catch_exceptions(orig_func):
281     """Catch uncaught exceptions and log them consistently."""
282
283     @functools.wraps(orig_func)
284     def catch_exceptions_wrapper(self, *args, **kwargs):
285         try:
286             return orig_func(self, *args, **kwargs)
287         except llfuse.FUSEError:
288             raise
289         except EnvironmentError as e:
290             raise llfuse.FUSEError(e.errno)
291         except arvados.errors.KeepWriteError as e:
292             _logger.error("Keep write error: " + str(e))
293             raise llfuse.FUSEError(errno.EIO)
294         except arvados.errors.NotFoundError as e:
295             _logger.error("Block not found error: " + str(e))
296             raise llfuse.FUSEError(errno.EIO)
297         except:
298             _logger.exception("Unhandled exception during FUSE operation")
299             raise llfuse.FUSEError(errno.EIO)
300
301     return catch_exceptions_wrapper
302
303
304 class Operations(llfuse.Operations):
305     """This is the main interface with llfuse.
306
307     The methods on this object are called by llfuse threads to service FUSE
308     events to query and read from the file system.
309
310     llfuse has its own global lock which is acquired before calling a request handler,
311     so request handlers do not run concurrently unless the lock is explicitly released
312     using 'with llfuse.lock_released:'
313
314     """
315
316     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
317         super(Operations, self).__init__()
318
319         self._api_client = api_client
320
321         if not inode_cache:
322             inode_cache = InodeCache(cap=256*1024*1024)
323         self.inodes = Inodes(inode_cache, encoding=encoding)
324         self.uid = uid
325         self.gid = gid
326         self.enable_write = enable_write
327
328         # dict of inode to filehandle
329         self._filehandles = {}
330         self._filehandles_counter = itertools.count(0)
331
332         # Other threads that need to wait until the fuse driver
333         # is fully initialized should wait() on this event object.
334         self.initlock = threading.Event()
335
336         # If we get overlapping shutdown events (e.g., fusermount -u
337         # -z and operations.destroy()) llfuse calls forget() on inodes
338         # that have already been deleted. To avoid this, we make
339         # forget() a no-op if called after destroy().
340         self._shutdown_started = threading.Event()
341
342         self.num_retries = num_retries
343
344         self.read_counter = arvados.keep.Counter()
345         self.write_counter = arvados.keep.Counter()
346         self.read_ops_counter = arvados.keep.Counter()
347         self.write_ops_counter = arvados.keep.Counter()
348
349         self.events = None
350
351     def init(self):
352         # Allow threads that are waiting for the driver to be finished
353         # initializing to continue
354         self.initlock.set()
355
356     @catch_exceptions
357     def destroy(self):
358         with llfuse.lock:
359             self._shutdown_started.set()
360             if self.events:
361                 self.events.close()
362                 self.events = None
363
364             self.inodes.clear()
365
366     def access(self, inode, mode, ctx):
367         return True
368
369     def listen_for_events(self):
370         self.events = arvados.events.subscribe(
371             self._api_client,
372             [["event_type", "in", ["create", "update", "delete"]]],
373             self.on_event)
374
375     @catch_exceptions
376     def on_event(self, ev):
377         if 'event_type' not in ev:
378             return
379         with llfuse.lock:
380             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
381                 item.invalidate()
382                 if ev["object_kind"] == "arvados#collection":
383                     new_attr = (ev.get("properties") and
384                                 ev["properties"].get("new_attributes") and
385                                 ev["properties"]["new_attributes"])
386
387                     # new_attributes.modified_at currently lacks
388                     # subsecond precision (see #6347) so use event_at
389                     # which should always be the same.
390                     record_version = (
391                         (ev["event_at"], new_attr["portable_data_hash"])
392                         if new_attr else None)
393
394                     item.update(to_record_version=record_version)
395                 else:
396                     item.update()
397
398             oldowner = (
399                 ev.get("properties") and
400                 ev["properties"].get("old_attributes") and
401                 ev["properties"]["old_attributes"].get("owner_uuid"))
402             newowner = ev["object_owner_uuid"]
403             for parent in (
404                     self.inodes.inode_cache.find_by_uuid(oldowner) +
405                     self.inodes.inode_cache.find_by_uuid(newowner)):
406                 parent.invalidate()
407                 parent.update()
408
409
410     @catch_exceptions
411     def getattr(self, inode):
412         if inode not in self.inodes:
413             raise llfuse.FUSEError(errno.ENOENT)
414
415         e = self.inodes[inode]
416
417         entry = llfuse.EntryAttributes()
418         entry.st_ino = inode
419         entry.generation = 0
420         entry.entry_timeout = 60 if e.allow_dirent_cache else 0
421         entry.attr_timeout = 60 if e.allow_attr_cache else 0
422
423         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
424         if isinstance(e, Directory):
425             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
426         else:
427             entry.st_mode |= stat.S_IFREG
428             if isinstance(e, FuseArvadosFile):
429                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
430
431         if self.enable_write and e.writable():
432             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
433
434         entry.st_nlink = 1
435         entry.st_uid = self.uid
436         entry.st_gid = self.gid
437         entry.st_rdev = 0
438
439         entry.st_size = e.size()
440
441         entry.st_blksize = 512
442         entry.st_blocks = (entry.st_size/512)+1
443         entry.st_atime = int(e.atime())
444         entry.st_mtime = int(e.mtime())
445         entry.st_ctime = int(e.mtime())
446
447         return entry
448
449     @catch_exceptions
450     def setattr(self, inode, attr):
451         entry = self.getattr(inode)
452
453         e = self.inodes[inode]
454
455         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
456             with llfuse.lock_released:
457                 e.arvfile.truncate(attr.st_size)
458                 entry.st_size = e.arvfile.size()
459
460         return entry
461
462     @catch_exceptions
463     def lookup(self, parent_inode, name):
464         name = unicode(name, self.inodes.encoding)
465         inode = None
466
467         if name == '.':
468             inode = parent_inode
469         else:
470             if parent_inode in self.inodes:
471                 p = self.inodes[parent_inode]
472                 self.inodes.touch(p)
473                 if name == '..':
474                     inode = p.parent_inode
475                 elif isinstance(p, Directory) and name in p:
476                     inode = p[name].inode
477
478         if inode != None:
479             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
480                       parent_inode, name, inode)
481             self.inodes[inode].inc_ref()
482             return self.getattr(inode)
483         else:
484             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
485                       parent_inode, name)
486             raise llfuse.FUSEError(errno.ENOENT)
487
488     @catch_exceptions
489     def forget(self, inodes):
490         if self._shutdown_started.is_set():
491             return
492         for inode, nlookup in inodes:
493             ent = self.inodes[inode]
494             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
495             if ent.dec_ref(nlookup) == 0 and ent.dead:
496                 self.inodes.del_entry(ent)
497
498     @catch_exceptions
499     def open(self, inode, flags):
500         if inode in self.inodes:
501             p = self.inodes[inode]
502         else:
503             raise llfuse.FUSEError(errno.ENOENT)
504
505         if isinstance(p, Directory):
506             raise llfuse.FUSEError(errno.EISDIR)
507
508         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
509             raise llfuse.FUSEError(errno.EPERM)
510
511         fh = next(self._filehandles_counter)
512         self._filehandles[fh] = FileHandle(fh, p)
513         self.inodes.touch(p)
514
515         # Normally, we will have received an "update" event if the
516         # parent collection is stale here. However, even if the parent
517         # collection hasn't changed, the manifest might have been
518         # fetched so long ago that the signatures on the data block
519         # locators have expired. Calling checkupdate() on all
520         # ancestors ensures the signatures will be refreshed if
521         # necessary.
522         while p.parent_inode in self.inodes:
523             if p == self.inodes[p.parent_inode]:
524                 break
525             p = self.inodes[p.parent_inode]
526             self.inodes.touch(p)
527             p.checkupdate()
528
529         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
530
531         return fh
532
533     @catch_exceptions
534     def read(self, fh, off, size):
535         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
536         self.read_ops_counter.add(1)
537
538         if fh in self._filehandles:
539             handle = self._filehandles[fh]
540         else:
541             raise llfuse.FUSEError(errno.EBADF)
542
543         self.inodes.touch(handle.obj)
544
545         r = handle.obj.readfrom(off, size, self.num_retries)
546         if r:
547             self.read_counter.add(len(r))
548         return r
549
550     @catch_exceptions
551     def write(self, fh, off, buf):
552         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
553         self.write_ops_counter.add(1)
554
555         if fh in self._filehandles:
556             handle = self._filehandles[fh]
557         else:
558             raise llfuse.FUSEError(errno.EBADF)
559
560         if not handle.obj.writable():
561             raise llfuse.FUSEError(errno.EPERM)
562
563         self.inodes.touch(handle.obj)
564
565         w = handle.obj.writeto(off, buf, self.num_retries)
566         if w:
567             self.write_counter.add(w)
568         return w
569
570     @catch_exceptions
571     def release(self, fh):
572         if fh in self._filehandles:
573             try:
574                 self._filehandles[fh].flush()
575             except Exception:
576                 raise
577             finally:
578                 self._filehandles[fh].release()
579                 del self._filehandles[fh]
580         self.inodes.inode_cache.cap_cache()
581
582     def releasedir(self, fh):
583         self.release(fh)
584
585     @catch_exceptions
586     def opendir(self, inode):
587         _logger.debug("arv-mount opendir: inode %i", inode)
588
589         if inode in self.inodes:
590             p = self.inodes[inode]
591         else:
592             raise llfuse.FUSEError(errno.ENOENT)
593
594         if not isinstance(p, Directory):
595             raise llfuse.FUSEError(errno.ENOTDIR)
596
597         fh = next(self._filehandles_counter)
598         if p.parent_inode in self.inodes:
599             parent = self.inodes[p.parent_inode]
600         else:
601             raise llfuse.FUSEError(errno.EIO)
602
603         # update atime
604         self.inodes.touch(p)
605
606         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
607         return fh
608
609     @catch_exceptions
610     def readdir(self, fh, off):
611         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
612
613         if fh in self._filehandles:
614             handle = self._filehandles[fh]
615         else:
616             raise llfuse.FUSEError(errno.EBADF)
617
618         e = off
619         while e < len(handle.entries):
620             if handle.entries[e][1].inode in self.inodes:
621                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
622             e += 1
623
624     @catch_exceptions
625     def statfs(self):
626         st = llfuse.StatvfsData()
627         st.f_bsize = 128 * 1024
628         st.f_blocks = 0
629         st.f_files = 0
630
631         st.f_bfree = 0
632         st.f_bavail = 0
633
634         st.f_ffree = 0
635         st.f_favail = 0
636
637         st.f_frsize = 0
638         return st
639
640     def _check_writable(self, inode_parent):
641         if not self.enable_write:
642             raise llfuse.FUSEError(errno.EROFS)
643
644         if inode_parent in self.inodes:
645             p = self.inodes[inode_parent]
646         else:
647             raise llfuse.FUSEError(errno.ENOENT)
648
649         if not isinstance(p, Directory):
650             raise llfuse.FUSEError(errno.ENOTDIR)
651
652         if not p.writable():
653             raise llfuse.FUSEError(errno.EPERM)
654
655         return p
656
657     @catch_exceptions
658     def create(self, inode_parent, name, mode, flags, ctx):
659         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
660
661         p = self._check_writable(inode_parent)
662         p.create(name)
663
664         # The file entry should have been implicitly created by callback.
665         f = p[name]
666         fh = next(self._filehandles_counter)
667         self._filehandles[fh] = FileHandle(fh, f)
668         self.inodes.touch(p)
669
670         f.inc_ref()
671         return (fh, self.getattr(f.inode))
672
673     @catch_exceptions
674     def mkdir(self, inode_parent, name, mode, ctx):
675         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
676
677         p = self._check_writable(inode_parent)
678         p.mkdir(name)
679
680         # The dir entry should have been implicitly created by callback.
681         d = p[name]
682
683         d.inc_ref()
684         return self.getattr(d.inode)
685
686     @catch_exceptions
687     def unlink(self, inode_parent, name):
688         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
689         p = self._check_writable(inode_parent)
690         p.unlink(name)
691
692     @catch_exceptions
693     def rmdir(self, inode_parent, name):
694         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
695         p = self._check_writable(inode_parent)
696         p.rmdir(name)
697
698     @catch_exceptions
699     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
700         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
701         src = self._check_writable(inode_parent_old)
702         dest = self._check_writable(inode_parent_new)
703         dest.rename(name_old, name_new, src)
704
705     @catch_exceptions
706     def flush(self, fh):
707         if fh in self._filehandles:
708             self._filehandles[fh].flush()
709
710     def fsync(self, fh, datasync):
711         self.flush(fh)
712
713     def fsyncdir(self, fh, datasync):
714         self.flush(fh)