Hotfix, inode cache find() returns list instead of single item now. refs #6643
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69
70 import Queue
71
72 # Default _notify_queue has a limit of 1000 items, but it really needs to be
73 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
74 # details.
75
76 llfuse.capi._notify_queue = Queue.Queue()
77
78 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
79 from fusefile import StringFile, FuseArvadosFile
80
81 _logger = logging.getLogger('arvados.arvados_fuse')
82
83 # Uncomment this to enable llfuse debug logging.
84 # log_handler = logging.StreamHandler()
85 # llogger = logging.getLogger('llfuse')
86 # llogger.addHandler(log_handler)
87 # llogger.setLevel(logging.DEBUG)
88
89 class Handle(object):
90     """Connects a numeric file handle to a File or Directory object that has
91     been opened by the client."""
92
93     def __init__(self, fh, obj):
94         self.fh = fh
95         self.obj = obj
96         self.obj.inc_use()
97
98     def release(self):
99         self.obj.dec_use()
100
101     def flush(self):
102         if self.obj.writable():
103             return self.obj.flush()
104
105
106 class FileHandle(Handle):
107     """Connects a numeric file handle to a File  object that has
108     been opened by the client."""
109     pass
110
111
112 class DirectoryHandle(Handle):
113     """Connects a numeric file handle to a Directory object that has
114     been opened by the client."""
115
116     def __init__(self, fh, dirobj, entries):
117         super(DirectoryHandle, self).__init__(fh, dirobj)
118         self.entries = entries
119
120
121 class InodeCache(object):
122     """Records the memory footprint of objects and when they are last used.
123
124     When the cache limit is exceeded, the least recently used objects are
125     cleared.  Clearing the object means discarding its contents to release
126     memory.  The next time the object is accessed, it must be re-fetched from
127     the server.  Note that the inode cache limit is a soft limit; the cache
128     limit may be exceeded if necessary to load very large objects, it may also
129     be exceeded if open file handles prevent objects from being cleared.
130
131     """
132
133     def __init__(self, cap, min_entries=4):
134         self._entries = collections.OrderedDict()
135         self._by_uuid = {}
136         self._counter = itertools.count(0)
137         self.cap = cap
138         self._total = 0
139         self.min_entries = min_entries
140
141     def total(self):
142         return self._total
143
144     def _remove(self, obj, clear):
145         if clear and not obj.clear():
146             _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
147             return False
148         self._total -= obj.cache_size
149         del self._entries[obj.cache_priority]
150         if obj.cache_uuid:
151             self._by_uuid[obj.cache_uuid].remove(obj)
152             if not self._by_uuid[obj.cache_uuid]:
153                 del self._by_uuid[obj.cache_uuid]
154             obj.cache_uuid = None
155         if clear:
156             _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
157         return True
158
159     def cap_cache(self):
160         if self._total > self.cap:
161             for key in list(self._entries.keys()):
162                 if self._total < self.cap or len(self._entries) < self.min_entries:
163                     break
164                 self._remove(self._entries[key], True)
165
166     def manage(self, obj):
167         if obj.persisted():
168             obj.cache_priority = next(self._counter)
169             obj.cache_size = obj.objsize()
170             self._entries[obj.cache_priority] = obj
171             obj.cache_uuid = obj.uuid()
172             if obj.cache_uuid:
173                 if obj.cache_uuid not in self._by_uuid:
174                     self._by_uuid[obj.cache_uuid] = [obj]
175                 else:
176                     if obj not in self._by_uuid[obj.cache_uuid]:
177                         self._by_uuid[obj.cache_uuid].append(obj)
178             self._total += obj.objsize()
179             _logger.debug("InodeCache touched %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
180             self.cap_cache()
181         else:
182             obj.cache_priority = None
183
184     def touch(self, obj):
185         if obj.persisted():
186             if obj.cache_priority in self._entries:
187                 self._remove(obj, False)
188             self.manage(obj)
189
190     def unmanage(self, obj):
191         if obj.persisted() and obj.cache_priority in self._entries:
192             self._remove(obj, True)
193
194     def find(self, uuid):
195         return self._by_uuid.get(uuid)
196
197     def clear(self):
198         self._entries.clear()
199         self._by_uuid.clear()
200         self._total = 0
201
202 class Inodes(object):
203     """Manage the set of inodes.  This is the mapping from a numeric id
204     to a concrete File or Directory object"""
205
206     def __init__(self, inode_cache, encoding="utf-8"):
207         self._entries = {}
208         self._counter = itertools.count(llfuse.ROOT_INODE)
209         self.inode_cache = inode_cache
210         self.encoding = encoding
211         self.deferred_invalidations = []
212
213     def __getitem__(self, item):
214         return self._entries[item]
215
216     def __setitem__(self, key, item):
217         self._entries[key] = item
218
219     def __iter__(self):
220         return self._entries.iterkeys()
221
222     def items(self):
223         return self._entries.items()
224
225     def __contains__(self, k):
226         return k in self._entries
227
228     def touch(self, entry):
229         entry._atime = time.time()
230         self.inode_cache.touch(entry)
231
232     def add_entry(self, entry):
233         entry.inode = next(self._counter)
234         if entry.inode == llfuse.ROOT_INODE:
235             entry.inc_ref()
236         self._entries[entry.inode] = entry
237         self.inode_cache.manage(entry)
238         return entry
239
240     def del_entry(self, entry):
241         if entry.ref_count == 0:
242             self.inode_cache.unmanage(entry)
243             del self._entries[entry.inode]
244             with llfuse.lock_released:
245                 entry.finalize()
246             self.invalidate_inode(entry.inode)
247             entry.inode = None
248         else:
249             entry.dead = True
250             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
251
252     def invalidate_inode(self, inode):
253         llfuse.invalidate_inode(inode)
254
255     def invalidate_entry(self, inode, name):
256         llfuse.invalidate_entry(inode, name)
257
258     def clear(self):
259         self.inode_cache.clear()
260
261         for k,v in self._entries.items():
262             try:
263                 v.finalize()
264             except Exception as e:
265                 _logger.exception("Error during finalize of inode %i", k)
266
267         self._entries.clear()
268
269
270 def catch_exceptions(orig_func):
271     """Catch uncaught exceptions and log them consistently."""
272
273     @functools.wraps(orig_func)
274     def catch_exceptions_wrapper(self, *args, **kwargs):
275         try:
276             return orig_func(self, *args, **kwargs)
277         except llfuse.FUSEError:
278             raise
279         except EnvironmentError as e:
280             raise llfuse.FUSEError(e.errno)
281         except arvados.errors.KeepWriteError as e:
282             _logger.error("Keep write error: " + str(e))
283             raise llfuse.FUSEError(errno.EIO)
284         except arvados.errors.NotFoundError as e:
285             _logger.error("Block not found error: " + str(e))
286             raise llfuse.FUSEError(errno.EIO)
287         except:
288             _logger.exception("Unhandled exception during FUSE operation")
289             raise llfuse.FUSEError(errno.EIO)
290
291     return catch_exceptions_wrapper
292
293
294 class Operations(llfuse.Operations):
295     """This is the main interface with llfuse.
296
297     The methods on this object are called by llfuse threads to service FUSE
298     events to query and read from the file system.
299
300     llfuse has its own global lock which is acquired before calling a request handler,
301     so request handlers do not run concurrently unless the lock is explicitly released
302     using 'with llfuse.lock_released:'
303
304     """
305
306     def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
307         super(Operations, self).__init__()
308
309         if not inode_cache:
310             inode_cache = InodeCache(cap=256*1024*1024)
311         self.inodes = Inodes(inode_cache, encoding=encoding)
312         self.uid = uid
313         self.gid = gid
314         self.enable_write = enable_write
315
316         # dict of inode to filehandle
317         self._filehandles = {}
318         self._filehandles_counter = itertools.count(0)
319
320         # Other threads that need to wait until the fuse driver
321         # is fully initialized should wait() on this event object.
322         self.initlock = threading.Event()
323
324         self.num_retries = num_retries
325
326         self.events = None
327
328     def init(self):
329         # Allow threads that are waiting for the driver to be finished
330         # initializing to continue
331         self.initlock.set()
332
333     @catch_exceptions
334     def destroy(self):
335         if self.events:
336             self.events.close()
337             self.events = None
338
339         self.inodes.clear()
340
341     def access(self, inode, mode, ctx):
342         return True
343
344     def listen_for_events(self, api_client):
345         self.events = arvados.events.subscribe(api_client,
346                                  [["event_type", "in", ["create", "update", "delete"]]],
347                                  self.on_event)
348
349     @catch_exceptions
350     def on_event(self, ev):
351         if 'event_type' in ev:
352             with llfuse.lock:
353                 items = self.inodes.inode_cache.find(ev["object_uuid"])
354                 if items is not None:
355                     for item in items:
356                         item.invalidate()
357                         if ev["object_kind"] == "arvados#collection":
358                             new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
359
360                             # new_attributes.modified_at currently lacks subsecond precision (see #6347) so use event_at which
361                             # should always be the same.
362                             #record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
363                             record_version = (ev["event_at"], new_attr["portable_data_hash"]) if new_attr else None
364
365                             item.update(to_record_version=record_version)
366                         else:
367                             item.update()
368
369                 oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
370                 olditemparent = self.inodes.inode_cache.find(oldowner)
371                 if olditemparent is not None:
372                     olditemparent.invalidate()
373                     olditemparent.update()
374
375                 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
376                 if itemparent is not None:
377                     itemparent.invalidate()
378                     itemparent.update()
379
380
381     @catch_exceptions
382     def getattr(self, inode):
383         if inode not in self.inodes:
384             raise llfuse.FUSEError(errno.ENOENT)
385
386         e = self.inodes[inode]
387
388         entry = llfuse.EntryAttributes()
389         entry.st_ino = inode
390         entry.generation = 0
391         entry.entry_timeout = 60
392         entry.attr_timeout = 60
393
394         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
395         if isinstance(e, Directory):
396             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
397         else:
398             entry.st_mode |= stat.S_IFREG
399             if isinstance(e, FuseArvadosFile):
400                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
401
402         if self.enable_write and e.writable():
403             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
404
405         entry.st_nlink = 1
406         entry.st_uid = self.uid
407         entry.st_gid = self.gid
408         entry.st_rdev = 0
409
410         entry.st_size = e.size()
411
412         entry.st_blksize = 512
413         entry.st_blocks = (entry.st_size/512)+1
414         entry.st_atime = int(e.atime())
415         entry.st_mtime = int(e.mtime())
416         entry.st_ctime = int(e.mtime())
417
418         return entry
419
420     @catch_exceptions
421     def setattr(self, inode, attr):
422         entry = self.getattr(inode)
423
424         e = self.inodes[inode]
425
426         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
427             with llfuse.lock_released:
428                 e.arvfile.truncate(attr.st_size)
429                 entry.st_size = e.arvfile.size()
430
431         return entry
432
433     @catch_exceptions
434     def lookup(self, parent_inode, name):
435         name = unicode(name, self.inodes.encoding)
436         inode = None
437
438         if name == '.':
439             inode = parent_inode
440         else:
441             if parent_inode in self.inodes:
442                 p = self.inodes[parent_inode]
443                 if name == '..':
444                     inode = p.parent_inode
445                 elif isinstance(p, Directory) and name in p:
446                     inode = p[name].inode
447
448         if inode != None:
449             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
450                       parent_inode, name, inode)
451             self.inodes[inode].inc_ref()
452             return self.getattr(inode)
453         else:
454             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
455                       parent_inode, name)
456             raise llfuse.FUSEError(errno.ENOENT)
457
458     @catch_exceptions
459     def forget(self, inodes):
460         for inode, nlookup in inodes:
461             ent = self.inodes[inode]
462             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
463             if ent.dec_ref(nlookup) == 0 and ent.dead:
464                 self.inodes.del_entry(ent)
465
466     @catch_exceptions
467     def open(self, inode, flags):
468         if inode in self.inodes:
469             p = self.inodes[inode]
470         else:
471             raise llfuse.FUSEError(errno.ENOENT)
472
473         if isinstance(p, Directory):
474             raise llfuse.FUSEError(errno.EISDIR)
475
476         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
477             raise llfuse.FUSEError(errno.EPERM)
478
479         fh = next(self._filehandles_counter)
480         self._filehandles[fh] = FileHandle(fh, p)
481         self.inodes.touch(p)
482         return fh
483
484     @catch_exceptions
485     def read(self, fh, off, size):
486         _logger.debug("arv-mount read %i %i %i", fh, off, size)
487         if fh in self._filehandles:
488             handle = self._filehandles[fh]
489         else:
490             raise llfuse.FUSEError(errno.EBADF)
491
492         self.inodes.touch(handle.obj)
493
494         return handle.obj.readfrom(off, size, self.num_retries)
495
496     @catch_exceptions
497     def write(self, fh, off, buf):
498         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
499         if fh in self._filehandles:
500             handle = self._filehandles[fh]
501         else:
502             raise llfuse.FUSEError(errno.EBADF)
503
504         if not handle.obj.writable():
505             raise llfuse.FUSEError(errno.EPERM)
506
507         self.inodes.touch(handle.obj)
508
509         return handle.obj.writeto(off, buf, self.num_retries)
510
511     @catch_exceptions
512     def release(self, fh):
513         if fh in self._filehandles:
514             try:
515                 self._filehandles[fh].flush()
516             except Exception:
517                 raise
518             finally:
519                 self._filehandles[fh].release()
520                 del self._filehandles[fh]
521         self.inodes.inode_cache.cap_cache()
522
523     def releasedir(self, fh):
524         self.release(fh)
525
526     @catch_exceptions
527     def opendir(self, inode):
528         _logger.debug("arv-mount opendir: inode %i", inode)
529
530         if inode in self.inodes:
531             p = self.inodes[inode]
532         else:
533             raise llfuse.FUSEError(errno.ENOENT)
534
535         if not isinstance(p, Directory):
536             raise llfuse.FUSEError(errno.ENOTDIR)
537
538         fh = next(self._filehandles_counter)
539         if p.parent_inode in self.inodes:
540             parent = self.inodes[p.parent_inode]
541         else:
542             raise llfuse.FUSEError(errno.EIO)
543
544         # update atime
545         self.inodes.touch(p)
546
547         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
548         return fh
549
550     @catch_exceptions
551     def readdir(self, fh, off):
552         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
553
554         if fh in self._filehandles:
555             handle = self._filehandles[fh]
556         else:
557             raise llfuse.FUSEError(errno.EBADF)
558
559         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
560
561         e = off
562         while e < len(handle.entries):
563             if handle.entries[e][1].inode in self.inodes:
564                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
565             e += 1
566
567     @catch_exceptions
568     def statfs(self):
569         st = llfuse.StatvfsData()
570         st.f_bsize = 128 * 1024
571         st.f_blocks = 0
572         st.f_files = 0
573
574         st.f_bfree = 0
575         st.f_bavail = 0
576
577         st.f_ffree = 0
578         st.f_favail = 0
579
580         st.f_frsize = 0
581         return st
582
583     def _check_writable(self, inode_parent):
584         if not self.enable_write:
585             raise llfuse.FUSEError(errno.EROFS)
586
587         if inode_parent in self.inodes:
588             p = self.inodes[inode_parent]
589         else:
590             raise llfuse.FUSEError(errno.ENOENT)
591
592         if not isinstance(p, Directory):
593             raise llfuse.FUSEError(errno.ENOTDIR)
594
595         if not p.writable():
596             raise llfuse.FUSEError(errno.EPERM)
597
598         return p
599
600     @catch_exceptions
601     def create(self, inode_parent, name, mode, flags, ctx):
602         _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
603
604         p = self._check_writable(inode_parent)
605         p.create(name)
606
607         # The file entry should have been implicitly created by callback.
608         f = p[name]
609         fh = next(self._filehandles_counter)
610         self._filehandles[fh] = FileHandle(fh, f)
611         self.inodes.touch(p)
612
613         f.inc_ref()
614         return (fh, self.getattr(f.inode))
615
616     @catch_exceptions
617     def mkdir(self, inode_parent, name, mode, ctx):
618         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
619
620         p = self._check_writable(inode_parent)
621         p.mkdir(name)
622
623         # The dir entry should have been implicitly created by callback.
624         d = p[name]
625
626         d.inc_ref()
627         return self.getattr(d.inode)
628
629     @catch_exceptions
630     def unlink(self, inode_parent, name):
631         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
632         p = self._check_writable(inode_parent)
633         p.unlink(name)
634
635     @catch_exceptions
636     def rmdir(self, inode_parent, name):
637         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
638         p = self._check_writable(inode_parent)
639         p.rmdir(name)
640
641     @catch_exceptions
642     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
643         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
644         src = self._check_writable(inode_parent_old)
645         dest = self._check_writable(inode_parent_new)
646         dest.rename(name_old, name_new, src)
647
648     @catch_exceptions
649     def flush(self, fh):
650         if fh in self._filehandles:
651             self._filehandles[fh].flush()
652
653     def fsync(self, fh, datasync):
654         self.flush(fh)
655
656     def fsyncdir(self, fh, datasync):
657         self.flush(fh)