10117: OrderedDict preserves insertion order, so use inode as key instead of cache_pr...
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69 import arvados.keep
70
71 import Queue
72
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
75 # details.
76
77 llfuse.capi._notify_queue = Queue.Queue()
78
79 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
80 from fusefile import StringFile, FuseArvadosFile
81
82 _logger = logging.getLogger('arvados.arvados_fuse')
83
84 # Uncomment this to enable llfuse debug logging.
85 # log_handler = logging.StreamHandler()
86 # llogger = logging.getLogger('llfuse')
87 # llogger.addHandler(log_handler)
88 # llogger.setLevel(logging.DEBUG)
89
90 class Handle(object):
91     """Connects a numeric file handle to a File or Directory object that has
92     been opened by the client."""
93
94     def __init__(self, fh, obj):
95         self.fh = fh
96         self.obj = obj
97         self.obj.inc_use()
98
99     def release(self):
100         self.obj.dec_use()
101
102     def flush(self):
103         if self.obj.writable():
104             return self.obj.flush()
105
106
107 class FileHandle(Handle):
108     """Connects a numeric file handle to a File  object that has
109     been opened by the client."""
110     pass
111
112
113 class DirectoryHandle(Handle):
114     """Connects a numeric file handle to a Directory object that has
115     been opened by the client."""
116
117     def __init__(self, fh, dirobj, entries):
118         super(DirectoryHandle, self).__init__(fh, dirobj)
119         self.entries = entries
120
121
122 class InodeCache(object):
123     """Records the memory footprint of objects and when they are last used.
124
125     When the cache limit is exceeded, the least recently used objects are
126     cleared.  Clearing the object means discarding its contents to release
127     memory.  The next time the object is accessed, it must be re-fetched from
128     the server.  Note that the inode cache limit is a soft limit; the cache
129     limit may be exceeded if necessary to load very large objects, it may also
130     be exceeded if open file handles prevent objects from being cleared.
131
132     """
133
134     def __init__(self, cap, min_entries=4):
135         self._entries = collections.OrderedDict()
136         self._by_uuid = {}
137         self.cap = cap
138         self._total = 0
139         self.min_entries = min_entries
140
141     def total(self):
142         return self._total
143
144     def _remove(self, obj, clear):
145         if clear:
146             if obj.in_use():
147                 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
148                 return False
149             if obj.has_ref(only_children=True):
150                 obj.kernel_invalidate()
151                 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
152                 return False
153             obj.clear()
154         self._total -= obj.cache_size
155         del self._entries[obj.inode]
156         if obj.cache_uuid:
157             self._by_uuid[obj.cache_uuid].remove(obj)
158             if not self._by_uuid[obj.cache_uuid]:
159                 del self._by_uuid[obj.cache_uuid]
160             obj.cache_uuid = None
161         if clear:
162             _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
163         return True
164
165     def cap_cache(self):
166         if self._total > self.cap:
167             for ent in self._entries.values():
168                 if self._total < self.cap or len(self._entries) < self.min_entries:
169                     break
170                 self._remove(ent, True)
171
172     def manage(self, obj):
173         if obj.persisted():
174             obj.cache_size = obj.objsize()
175             self._entries[obj.inode] = obj
176             obj.cache_uuid = obj.uuid()
177             if obj.cache_uuid:
178                 if obj.cache_uuid not in self._by_uuid:
179                     self._by_uuid[obj.cache_uuid] = [obj]
180                 else:
181                     if obj not in self._by_uuid[obj.cache_uuid]:
182                         self._by_uuid[obj.cache_uuid].append(obj)
183             self._total += obj.objsize()
184             _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
185             self.cap_cache()
186
187     def touch(self, obj):
188         if obj.persisted():
189             if obj.inode in self._entries:
190                 self._remove(obj, False)
191             self.manage(obj)
192
193     def unmanage(self, obj):
194         if obj.persisted() and obj.inode in self._entries:
195             self._remove(obj, True)
196
197     def find_by_uuid(self, uuid):
198         return self._by_uuid.get(uuid, [])
199
200     def clear(self):
201         self._entries.clear()
202         self._by_uuid.clear()
203         self._total = 0
204
205 class Inodes(object):
206     """Manage the set of inodes.  This is the mapping from a numeric id
207     to a concrete File or Directory object"""
208
209     def __init__(self, inode_cache, encoding="utf-8"):
210         self._entries = {}
211         self._counter = itertools.count(llfuse.ROOT_INODE)
212         self.inode_cache = inode_cache
213         self.encoding = encoding
214         self.deferred_invalidations = []
215
216     def __getitem__(self, item):
217         return self._entries[item]
218
219     def __setitem__(self, key, item):
220         self._entries[key] = item
221
222     def __iter__(self):
223         return self._entries.iterkeys()
224
225     def items(self):
226         return self._entries.items()
227
228     def __contains__(self, k):
229         return k in self._entries
230
231     def touch(self, entry):
232         entry._atime = time.time()
233         self.inode_cache.touch(entry)
234
235     def add_entry(self, entry):
236         entry.inode = next(self._counter)
237         if entry.inode == llfuse.ROOT_INODE:
238             entry.inc_ref()
239         self._entries[entry.inode] = entry
240         self.inode_cache.manage(entry)
241         return entry
242
243     def del_entry(self, entry):
244         if entry.ref_count == 0:
245             self.inode_cache.unmanage(entry)
246             del self._entries[entry.inode]
247             with llfuse.lock_released:
248                 entry.finalize()
249             self.invalidate_inode(entry.inode)
250             entry.inode = None
251         else:
252             entry.dead = True
253             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
254
255     def invalidate_inode(self, inode):
256         llfuse.invalidate_inode(inode)
257
258     def invalidate_entry(self, inode, name):
259         llfuse.invalidate_entry(inode, name.encode(self.encoding))
260
261     def clear(self):
262         self.inode_cache.clear()
263
264         for k,v in self._entries.items():
265             try:
266                 v.finalize()
267             except Exception as e:
268                 _logger.exception("Error during finalize of inode %i", k)
269
270         self._entries.clear()
271
272
273 def catch_exceptions(orig_func):
274     """Catch uncaught exceptions and log them consistently."""
275
276     @functools.wraps(orig_func)
277     def catch_exceptions_wrapper(self, *args, **kwargs):
278         try:
279             return orig_func(self, *args, **kwargs)
280         except llfuse.FUSEError:
281             raise
282         except EnvironmentError as e:
283             raise llfuse.FUSEError(e.errno)
284         except arvados.errors.KeepWriteError as e:
285             _logger.error("Keep write error: " + str(e))
286             raise llfuse.FUSEError(errno.EIO)
287         except arvados.errors.NotFoundError as e:
288             _logger.error("Block not found error: " + str(e))
289             raise llfuse.FUSEError(errno.EIO)
290         except:
291             _logger.exception("Unhandled exception during FUSE operation")
292             raise llfuse.FUSEError(errno.EIO)
293
294     return catch_exceptions_wrapper
295
296
297 class Operations(llfuse.Operations):
298     """This is the main interface with llfuse.
299
300     The methods on this object are called by llfuse threads to service FUSE
301     events to query and read from the file system.
302
303     llfuse has its own global lock which is acquired before calling a request handler,
304     so request handlers do not run concurrently unless the lock is explicitly released
305     using 'with llfuse.lock_released:'
306
307     """
308
309     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
310         super(Operations, self).__init__()
311
312         self._api_client = api_client
313
314         if not inode_cache:
315             inode_cache = InodeCache(cap=256*1024*1024)
316         self.inodes = Inodes(inode_cache, encoding=encoding)
317         self.uid = uid
318         self.gid = gid
319         self.enable_write = enable_write
320
321         # dict of inode to filehandle
322         self._filehandles = {}
323         self._filehandles_counter = itertools.count(0)
324
325         # Other threads that need to wait until the fuse driver
326         # is fully initialized should wait() on this event object.
327         self.initlock = threading.Event()
328
329         # If we get overlapping shutdown events (e.g., fusermount -u
330         # -z and operations.destroy()) llfuse calls forget() on inodes
331         # that have already been deleted. To avoid this, we make
332         # forget() a no-op if called after destroy().
333         self._shutdown_started = threading.Event()
334
335         self.num_retries = num_retries
336
337         self.read_counter = arvados.keep.Counter()
338         self.write_counter = arvados.keep.Counter()
339         self.read_ops_counter = arvados.keep.Counter()
340         self.write_ops_counter = arvados.keep.Counter()
341
342         self.events = None
343
344     def init(self):
345         # Allow threads that are waiting for the driver to be finished
346         # initializing to continue
347         self.initlock.set()
348
349     @catch_exceptions
350     def destroy(self):
351         with llfuse.lock:
352             self._shutdown_started.set()
353             if self.events:
354                 self.events.close()
355                 self.events = None
356
357             self.inodes.clear()
358
359     def access(self, inode, mode, ctx):
360         return True
361
362     def listen_for_events(self):
363         self.events = arvados.events.subscribe(self._api_client,
364                                  [["event_type", "in", ["create", "update", "delete"]]],
365                                  self.on_event)
366
367     @catch_exceptions
368     def on_event(self, ev):
369         if 'event_type' not in ev:
370             return
371         with llfuse.lock:
372             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
373                 item.invalidate()
374                 if ev["object_kind"] == "arvados#collection":
375                     new_attr = (ev.get("properties") and
376                                 ev["properties"].get("new_attributes") and
377                                 ev["properties"]["new_attributes"])
378
379                     # new_attributes.modified_at currently lacks
380                     # subsecond precision (see #6347) so use event_at
381                     # which should always be the same.
382                     record_version = (
383                         (ev["event_at"], new_attr["portable_data_hash"])
384                         if new_attr else None)
385
386                     item.update(to_record_version=record_version)
387                 else:
388                     item.update()
389
390             oldowner = (
391                 ev.get("properties") and
392                 ev["properties"].get("old_attributes") and
393                 ev["properties"]["old_attributes"].get("owner_uuid"))
394             newowner = ev["object_owner_uuid"]
395             for parent in (
396                     self.inodes.inode_cache.find_by_uuid(oldowner) +
397                     self.inodes.inode_cache.find_by_uuid(newowner)):
398                 parent.invalidate()
399                 parent.update()
400
401
402     @catch_exceptions
403     def getattr(self, inode):
404         if inode not in self.inodes:
405             raise llfuse.FUSEError(errno.ENOENT)
406
407         e = self.inodes[inode]
408
409         entry = llfuse.EntryAttributes()
410         entry.st_ino = inode
411         entry.generation = 0
412         entry.entry_timeout = 60 if e.allow_dirent_cache else 0
413         entry.attr_timeout = 60 if e.allow_attr_cache else 0
414
415         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
416         if isinstance(e, Directory):
417             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
418         else:
419             entry.st_mode |= stat.S_IFREG
420             if isinstance(e, FuseArvadosFile):
421                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
422
423         if self.enable_write and e.writable():
424             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
425
426         entry.st_nlink = 1
427         entry.st_uid = self.uid
428         entry.st_gid = self.gid
429         entry.st_rdev = 0
430
431         entry.st_size = e.size()
432
433         entry.st_blksize = 512
434         entry.st_blocks = (entry.st_size/512)+1
435         entry.st_atime = int(e.atime())
436         entry.st_mtime = int(e.mtime())
437         entry.st_ctime = int(e.mtime())
438
439         return entry
440
441     @catch_exceptions
442     def setattr(self, inode, attr):
443         entry = self.getattr(inode)
444
445         e = self.inodes[inode]
446
447         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
448             with llfuse.lock_released:
449                 e.arvfile.truncate(attr.st_size)
450                 entry.st_size = e.arvfile.size()
451
452         return entry
453
454     @catch_exceptions
455     def lookup(self, parent_inode, name):
456         name = unicode(name, self.inodes.encoding)
457         inode = None
458
459         if name == '.':
460             inode = parent_inode
461         else:
462             if parent_inode in self.inodes:
463                 p = self.inodes[parent_inode]
464                 self.inodes.touch(p)
465                 if name == '..':
466                     inode = p.parent_inode
467                 elif isinstance(p, Directory) and name in p:
468                     inode = p[name].inode
469
470         if inode != None:
471             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
472                       parent_inode, name, inode)
473             self.inodes[inode].inc_ref()
474             return self.getattr(inode)
475         else:
476             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
477                       parent_inode, name)
478             raise llfuse.FUSEError(errno.ENOENT)
479
480     @catch_exceptions
481     def forget(self, inodes):
482         if self._shutdown_started.is_set():
483             return
484         for inode, nlookup in inodes:
485             ent = self.inodes[inode]
486             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
487             if ent.dec_ref(nlookup) == 0 and ent.dead:
488                 self.inodes.del_entry(ent)
489
490     @catch_exceptions
491     def open(self, inode, flags):
492         if inode in self.inodes:
493             p = self.inodes[inode]
494         else:
495             raise llfuse.FUSEError(errno.ENOENT)
496
497         if isinstance(p, Directory):
498             raise llfuse.FUSEError(errno.EISDIR)
499
500         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
501             raise llfuse.FUSEError(errno.EPERM)
502
503         fh = next(self._filehandles_counter)
504         self._filehandles[fh] = FileHandle(fh, p)
505         self.inodes.touch(p)
506
507         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
508
509         return fh
510
511     @catch_exceptions
512     def read(self, fh, off, size):
513         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
514         self.read_ops_counter.add(1)
515
516         if fh in self._filehandles:
517             handle = self._filehandles[fh]
518         else:
519             raise llfuse.FUSEError(errno.EBADF)
520
521         self.inodes.touch(handle.obj)
522
523         r = handle.obj.readfrom(off, size, self.num_retries)
524         if r:
525             self.read_counter.add(len(r))
526         return r
527
528     @catch_exceptions
529     def write(self, fh, off, buf):
530         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
531         self.write_ops_counter.add(1)
532
533         if fh in self._filehandles:
534             handle = self._filehandles[fh]
535         else:
536             raise llfuse.FUSEError(errno.EBADF)
537
538         if not handle.obj.writable():
539             raise llfuse.FUSEError(errno.EPERM)
540
541         self.inodes.touch(handle.obj)
542
543         w = handle.obj.writeto(off, buf, self.num_retries)
544         if w:
545             self.write_counter.add(w)
546         return w
547
548     @catch_exceptions
549     def release(self, fh):
550         if fh in self._filehandles:
551             try:
552                 self._filehandles[fh].flush()
553             except Exception:
554                 raise
555             finally:
556                 self._filehandles[fh].release()
557                 del self._filehandles[fh]
558         self.inodes.inode_cache.cap_cache()
559
560     def releasedir(self, fh):
561         self.release(fh)
562
563     @catch_exceptions
564     def opendir(self, inode):
565         _logger.debug("arv-mount opendir: inode %i", inode)
566
567         if inode in self.inodes:
568             p = self.inodes[inode]
569         else:
570             raise llfuse.FUSEError(errno.ENOENT)
571
572         if not isinstance(p, Directory):
573             raise llfuse.FUSEError(errno.ENOTDIR)
574
575         fh = next(self._filehandles_counter)
576         if p.parent_inode in self.inodes:
577             parent = self.inodes[p.parent_inode]
578         else:
579             raise llfuse.FUSEError(errno.EIO)
580
581         # update atime
582         self.inodes.touch(p)
583
584         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
585         return fh
586
587     @catch_exceptions
588     def readdir(self, fh, off):
589         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
590
591         if fh in self._filehandles:
592             handle = self._filehandles[fh]
593         else:
594             raise llfuse.FUSEError(errno.EBADF)
595
596         e = off
597         while e < len(handle.entries):
598             if handle.entries[e][1].inode in self.inodes:
599                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
600             e += 1
601
602     @catch_exceptions
603     def statfs(self):
604         st = llfuse.StatvfsData()
605         st.f_bsize = 128 * 1024
606         st.f_blocks = 0
607         st.f_files = 0
608
609         st.f_bfree = 0
610         st.f_bavail = 0
611
612         st.f_ffree = 0
613         st.f_favail = 0
614
615         st.f_frsize = 0
616         return st
617
618     def _check_writable(self, inode_parent):
619         if not self.enable_write:
620             raise llfuse.FUSEError(errno.EROFS)
621
622         if inode_parent in self.inodes:
623             p = self.inodes[inode_parent]
624         else:
625             raise llfuse.FUSEError(errno.ENOENT)
626
627         if not isinstance(p, Directory):
628             raise llfuse.FUSEError(errno.ENOTDIR)
629
630         if not p.writable():
631             raise llfuse.FUSEError(errno.EPERM)
632
633         return p
634
635     @catch_exceptions
636     def create(self, inode_parent, name, mode, flags, ctx):
637         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
638
639         p = self._check_writable(inode_parent)
640         p.create(name)
641
642         # The file entry should have been implicitly created by callback.
643         f = p[name]
644         fh = next(self._filehandles_counter)
645         self._filehandles[fh] = FileHandle(fh, f)
646         self.inodes.touch(p)
647
648         f.inc_ref()
649         return (fh, self.getattr(f.inode))
650
651     @catch_exceptions
652     def mkdir(self, inode_parent, name, mode, ctx):
653         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
654
655         p = self._check_writable(inode_parent)
656         p.mkdir(name)
657
658         # The dir entry should have been implicitly created by callback.
659         d = p[name]
660
661         d.inc_ref()
662         return self.getattr(d.inode)
663
664     @catch_exceptions
665     def unlink(self, inode_parent, name):
666         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
667         p = self._check_writable(inode_parent)
668         p.unlink(name)
669
670     @catch_exceptions
671     def rmdir(self, inode_parent, name):
672         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
673         p = self._check_writable(inode_parent)
674         p.rmdir(name)
675
676     @catch_exceptions
677     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
678         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
679         src = self._check_writable(inode_parent_old)
680         dest = self._check_writable(inode_parent_new)
681         dest.rename(name_old, name_new, src)
682
683     @catch_exceptions
684     def flush(self, fh):
685         if fh in self._filehandles:
686             self._filehandles[fh].flush()
687
688     def fsync(self, fh, datasync):
689         self.flush(fh)
690
691     def fsyncdir(self, fh, datasync):
692         self.flush(fh)