3137: Refactor stats to record keep & fuse operations as well as bytes.
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69 import arvados.keep
70
71 import Queue
72
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
75 # details.
76
77 llfuse.capi._notify_queue = Queue.Queue()
78
79 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
80 from fusefile import StringFile, FuseArvadosFile
81
82 _logger = logging.getLogger('arvados.arvados_fuse')
83
84 # Uncomment this to enable llfuse debug logging.
85 # log_handler = logging.StreamHandler()
86 # llogger = logging.getLogger('llfuse')
87 # llogger.addHandler(log_handler)
88 # llogger.setLevel(logging.DEBUG)
89
90 class Handle(object):
91     """Connects a numeric file handle to a File or Directory object that has
92     been opened by the client."""
93
94     def __init__(self, fh, obj):
95         self.fh = fh
96         self.obj = obj
97         self.obj.inc_use()
98
99     def release(self):
100         self.obj.dec_use()
101
102     def flush(self):
103         if self.obj.writable():
104             return self.obj.flush()
105
106
107 class FileHandle(Handle):
108     """Connects a numeric file handle to a File  object that has
109     been opened by the client."""
110     pass
111
112
113 class DirectoryHandle(Handle):
114     """Connects a numeric file handle to a Directory object that has
115     been opened by the client."""
116
117     def __init__(self, fh, dirobj, entries):
118         super(DirectoryHandle, self).__init__(fh, dirobj)
119         self.entries = entries
120
121
122 class InodeCache(object):
123     """Records the memory footprint of objects and when they are last used.
124
125     When the cache limit is exceeded, the least recently used objects are
126     cleared.  Clearing the object means discarding its contents to release
127     memory.  The next time the object is accessed, it must be re-fetched from
128     the server.  Note that the inode cache limit is a soft limit; the cache
129     limit may be exceeded if necessary to load very large objects, it may also
130     be exceeded if open file handles prevent objects from being cleared.
131
132     """
133
134     def __init__(self, cap, min_entries=4):
135         self._entries = collections.OrderedDict()
136         self._by_uuid = {}
137         self._counter = itertools.count(0)
138         self.cap = cap
139         self._total = 0
140         self.min_entries = min_entries
141
142     def total(self):
143         return self._total
144
145     def _remove(self, obj, clear):
146         if clear and not obj.clear():
147             _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
148             return False
149         self._total -= obj.cache_size
150         del self._entries[obj.cache_priority]
151         if obj.cache_uuid:
152             self._by_uuid[obj.cache_uuid].remove(obj)
153             if not self._by_uuid[obj.cache_uuid]:
154                 del self._by_uuid[obj.cache_uuid]
155             obj.cache_uuid = None
156         if clear:
157             _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
158         return True
159
160     def cap_cache(self):
161         if self._total > self.cap:
162             for key in list(self._entries.keys()):
163                 if self._total < self.cap or len(self._entries) < self.min_entries:
164                     break
165                 self._remove(self._entries[key], True)
166
167     def manage(self, obj):
168         if obj.persisted():
169             obj.cache_priority = next(self._counter)
170             obj.cache_size = obj.objsize()
171             self._entries[obj.cache_priority] = obj
172             obj.cache_uuid = obj.uuid()
173             if obj.cache_uuid:
174                 if obj.cache_uuid not in self._by_uuid:
175                     self._by_uuid[obj.cache_uuid] = [obj]
176                 else:
177                     if obj not in self._by_uuid[obj.cache_uuid]:
178                         self._by_uuid[obj.cache_uuid].append(obj)
179             self._total += obj.objsize()
180             _logger.debug("InodeCache touched %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
181             self.cap_cache()
182         else:
183             obj.cache_priority = None
184
185     def touch(self, obj):
186         if obj.persisted():
187             if obj.cache_priority in self._entries:
188                 self._remove(obj, False)
189             self.manage(obj)
190
191     def unmanage(self, obj):
192         if obj.persisted() and obj.cache_priority in self._entries:
193             self._remove(obj, True)
194
195     def find(self, uuid):
196         return self._by_uuid.get(uuid)
197
198     def clear(self):
199         self._entries.clear()
200         self._by_uuid.clear()
201         self._total = 0
202
203 class Inodes(object):
204     """Manage the set of inodes.  This is the mapping from a numeric id
205     to a concrete File or Directory object"""
206
207     def __init__(self, inode_cache, encoding="utf-8"):
208         self._entries = {}
209         self._counter = itertools.count(llfuse.ROOT_INODE)
210         self.inode_cache = inode_cache
211         self.encoding = encoding
212         self.deferred_invalidations = []
213
214     def __getitem__(self, item):
215         return self._entries[item]
216
217     def __setitem__(self, key, item):
218         self._entries[key] = item
219
220     def __iter__(self):
221         return self._entries.iterkeys()
222
223     def items(self):
224         return self._entries.items()
225
226     def __contains__(self, k):
227         return k in self._entries
228
229     def touch(self, entry):
230         entry._atime = time.time()
231         self.inode_cache.touch(entry)
232
233     def add_entry(self, entry):
234         entry.inode = next(self._counter)
235         if entry.inode == llfuse.ROOT_INODE:
236             entry.inc_ref()
237         self._entries[entry.inode] = entry
238         self.inode_cache.manage(entry)
239         return entry
240
241     def del_entry(self, entry):
242         if entry.ref_count == 0:
243             self.inode_cache.unmanage(entry)
244             del self._entries[entry.inode]
245             with llfuse.lock_released:
246                 entry.finalize()
247             self.invalidate_inode(entry.inode)
248             entry.inode = None
249         else:
250             entry.dead = True
251             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
252
253     def invalidate_inode(self, inode):
254         llfuse.invalidate_inode(inode)
255
256     def invalidate_entry(self, inode, name):
257         llfuse.invalidate_entry(inode, name)
258
259     def clear(self):
260         self.inode_cache.clear()
261
262         for k,v in self._entries.items():
263             try:
264                 v.finalize()
265             except Exception as e:
266                 _logger.exception("Error during finalize of inode %i", k)
267
268         self._entries.clear()
269
270
271 def catch_exceptions(orig_func):
272     """Catch uncaught exceptions and log them consistently."""
273
274     @functools.wraps(orig_func)
275     def catch_exceptions_wrapper(self, *args, **kwargs):
276         try:
277             return orig_func(self, *args, **kwargs)
278         except llfuse.FUSEError:
279             raise
280         except EnvironmentError as e:
281             raise llfuse.FUSEError(e.errno)
282         except arvados.errors.KeepWriteError as e:
283             _logger.error("Keep write error: " + str(e))
284             raise llfuse.FUSEError(errno.EIO)
285         except arvados.errors.NotFoundError as e:
286             _logger.error("Block not found error: " + str(e))
287             raise llfuse.FUSEError(errno.EIO)
288         except:
289             _logger.exception("Unhandled exception during FUSE operation")
290             raise llfuse.FUSEError(errno.EIO)
291
292     return catch_exceptions_wrapper
293
294
295 class Operations(llfuse.Operations):
296     """This is the main interface with llfuse.
297
298     The methods on this object are called by llfuse threads to service FUSE
299     events to query and read from the file system.
300
301     llfuse has its own global lock which is acquired before calling a request handler,
302     so request handlers do not run concurrently unless the lock is explicitly released
303     using 'with llfuse.lock_released:'
304
305     """
306
307     def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
308         super(Operations, self).__init__()
309
310         if not inode_cache:
311             inode_cache = InodeCache(cap=256*1024*1024)
312         self.inodes = Inodes(inode_cache, encoding=encoding)
313         self.uid = uid
314         self.gid = gid
315         self.enable_write = enable_write
316
317         # dict of inode to filehandle
318         self._filehandles = {}
319         self._filehandles_counter = itertools.count(0)
320
321         # Other threads that need to wait until the fuse driver
322         # is fully initialized should wait() on this event object.
323         self.initlock = threading.Event()
324
325         self.num_retries = num_retries
326
327         self.read_counter = arvados.keep.Counter()
328         self.write_counter = arvados.keep.Counter()
329         self.read_ops_counter = arvados.keep.Counter()
330         self.write_ops_counter = arvados.keep.Counter()
331
332         self.events = None
333
334     def init(self):
335         # Allow threads that are waiting for the driver to be finished
336         # initializing to continue
337         self.initlock.set()
338
339     @catch_exceptions
340     def destroy(self):
341         if self.events:
342             self.events.close()
343             self.events = None
344
345         self.inodes.clear()
346
347     def access(self, inode, mode, ctx):
348         return True
349
350     def listen_for_events(self, api_client):
351         self.events = arvados.events.subscribe(api_client,
352                                  [["event_type", "in", ["create", "update", "delete"]]],
353                                  self.on_event)
354
355     @catch_exceptions
356     def on_event(self, ev):
357         if 'event_type' in ev:
358             with llfuse.lock:
359                 items = self.inodes.inode_cache.find(ev["object_uuid"])
360                 if items is not None:
361                     for item in items:
362                         item.invalidate()
363                         if ev["object_kind"] == "arvados#collection":
364                             new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
365
366                             # new_attributes.modified_at currently lacks subsecond precision (see #6347) so use event_at which
367                             # should always be the same.
368                             #record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
369                             record_version = (ev["event_at"], new_attr["portable_data_hash"]) if new_attr else None
370
371                             item.update(to_record_version=record_version)
372                         else:
373                             item.update()
374
375                 oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
376                 olditemparent = self.inodes.inode_cache.find(oldowner)
377                 if olditemparent is not None:
378                     olditemparent.invalidate()
379                     olditemparent.update()
380
381                 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
382                 if itemparent is not None:
383                     itemparent.invalidate()
384                     itemparent.update()
385
386
387     @catch_exceptions
388     def getattr(self, inode):
389         if inode not in self.inodes:
390             raise llfuse.FUSEError(errno.ENOENT)
391
392         e = self.inodes[inode]
393
394         entry = llfuse.EntryAttributes()
395         entry.st_ino = inode
396         entry.generation = 0
397         entry.entry_timeout = 60
398         entry.attr_timeout = 60
399
400         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
401         if isinstance(e, Directory):
402             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
403         else:
404             entry.st_mode |= stat.S_IFREG
405             if isinstance(e, FuseArvadosFile):
406                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
407
408         if self.enable_write and e.writable():
409             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
410
411         entry.st_nlink = 1
412         entry.st_uid = self.uid
413         entry.st_gid = self.gid
414         entry.st_rdev = 0
415
416         entry.st_size = e.size()
417
418         entry.st_blksize = 512
419         entry.st_blocks = (entry.st_size/512)+1
420         entry.st_atime = int(e.atime())
421         entry.st_mtime = int(e.mtime())
422         entry.st_ctime = int(e.mtime())
423
424         return entry
425
426     @catch_exceptions
427     def setattr(self, inode, attr):
428         entry = self.getattr(inode)
429
430         e = self.inodes[inode]
431
432         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
433             with llfuse.lock_released:
434                 e.arvfile.truncate(attr.st_size)
435                 entry.st_size = e.arvfile.size()
436
437         return entry
438
439     @catch_exceptions
440     def lookup(self, parent_inode, name):
441         name = unicode(name, self.inodes.encoding)
442         inode = None
443
444         if name == '.':
445             inode = parent_inode
446         else:
447             if parent_inode in self.inodes:
448                 p = self.inodes[parent_inode]
449                 if name == '..':
450                     inode = p.parent_inode
451                 elif isinstance(p, Directory) and name in p:
452                     inode = p[name].inode
453
454         if inode != None:
455             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
456                       parent_inode, name, inode)
457             self.inodes[inode].inc_ref()
458             return self.getattr(inode)
459         else:
460             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
461                       parent_inode, name)
462             raise llfuse.FUSEError(errno.ENOENT)
463
464     @catch_exceptions
465     def forget(self, inodes):
466         for inode, nlookup in inodes:
467             ent = self.inodes[inode]
468             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
469             if ent.dec_ref(nlookup) == 0 and ent.dead:
470                 self.inodes.del_entry(ent)
471
472     @catch_exceptions
473     def open(self, inode, flags):
474         if inode in self.inodes:
475             p = self.inodes[inode]
476         else:
477             raise llfuse.FUSEError(errno.ENOENT)
478
479         if isinstance(p, Directory):
480             raise llfuse.FUSEError(errno.EISDIR)
481
482         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
483             raise llfuse.FUSEError(errno.EPERM)
484
485         fh = next(self._filehandles_counter)
486         self._filehandles[fh] = FileHandle(fh, p)
487         self.inodes.touch(p)
488         return fh
489
490     @catch_exceptions
491     def read(self, fh, off, size):
492         _logger.debug("arv-mount read %i %i %i", fh, off, size)
493         self.read_ops_counter.add(1)
494
495         if fh in self._filehandles:
496             handle = self._filehandles[fh]
497         else:
498             raise llfuse.FUSEError(errno.EBADF)
499
500         self.inodes.touch(handle.obj)
501
502         r = handle.obj.readfrom(off, size, self.num_retries)
503         if r:
504             self.read_counter.add(len(r))
505         return r
506
507     @catch_exceptions
508     def write(self, fh, off, buf):
509         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
510         self.write_ops_counter.add(1)
511
512         if fh in self._filehandles:
513             handle = self._filehandles[fh]
514         else:
515             raise llfuse.FUSEError(errno.EBADF)
516
517         if not handle.obj.writable():
518             raise llfuse.FUSEError(errno.EPERM)
519
520         self.inodes.touch(handle.obj)
521
522         w = handle.obj.writeto(off, buf, self.num_retries)
523         if w:
524             self.write_counter.add(w)
525         return w
526
527     @catch_exceptions
528     def release(self, fh):
529         if fh in self._filehandles:
530             try:
531                 self._filehandles[fh].flush()
532             except Exception:
533                 raise
534             finally:
535                 self._filehandles[fh].release()
536                 del self._filehandles[fh]
537         self.inodes.inode_cache.cap_cache()
538
539     def releasedir(self, fh):
540         self.release(fh)
541
542     @catch_exceptions
543     def opendir(self, inode):
544         _logger.debug("arv-mount opendir: inode %i", inode)
545
546         if inode in self.inodes:
547             p = self.inodes[inode]
548         else:
549             raise llfuse.FUSEError(errno.ENOENT)
550
551         if not isinstance(p, Directory):
552             raise llfuse.FUSEError(errno.ENOTDIR)
553
554         fh = next(self._filehandles_counter)
555         if p.parent_inode in self.inodes:
556             parent = self.inodes[p.parent_inode]
557         else:
558             raise llfuse.FUSEError(errno.EIO)
559
560         # update atime
561         self.inodes.touch(p)
562
563         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
564         return fh
565
566     @catch_exceptions
567     def readdir(self, fh, off):
568         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
569
570         if fh in self._filehandles:
571             handle = self._filehandles[fh]
572         else:
573             raise llfuse.FUSEError(errno.EBADF)
574
575         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
576
577         e = off
578         while e < len(handle.entries):
579             if handle.entries[e][1].inode in self.inodes:
580                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
581             e += 1
582
583     @catch_exceptions
584     def statfs(self):
585         st = llfuse.StatvfsData()
586         st.f_bsize = 128 * 1024
587         st.f_blocks = 0
588         st.f_files = 0
589
590         st.f_bfree = 0
591         st.f_bavail = 0
592
593         st.f_ffree = 0
594         st.f_favail = 0
595
596         st.f_frsize = 0
597         return st
598
599     def _check_writable(self, inode_parent):
600         if not self.enable_write:
601             raise llfuse.FUSEError(errno.EROFS)
602
603         if inode_parent in self.inodes:
604             p = self.inodes[inode_parent]
605         else:
606             raise llfuse.FUSEError(errno.ENOENT)
607
608         if not isinstance(p, Directory):
609             raise llfuse.FUSEError(errno.ENOTDIR)
610
611         if not p.writable():
612             raise llfuse.FUSEError(errno.EPERM)
613
614         return p
615
616     @catch_exceptions
617     def create(self, inode_parent, name, mode, flags, ctx):
618         _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
619
620         p = self._check_writable(inode_parent)
621         p.create(name)
622
623         # The file entry should have been implicitly created by callback.
624         f = p[name]
625         fh = next(self._filehandles_counter)
626         self._filehandles[fh] = FileHandle(fh, f)
627         self.inodes.touch(p)
628
629         f.inc_ref()
630         return (fh, self.getattr(f.inode))
631
632     @catch_exceptions
633     def mkdir(self, inode_parent, name, mode, ctx):
634         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
635
636         p = self._check_writable(inode_parent)
637         p.mkdir(name)
638
639         # The dir entry should have been implicitly created by callback.
640         d = p[name]
641
642         d.inc_ref()
643         return self.getattr(d.inode)
644
645     @catch_exceptions
646     def unlink(self, inode_parent, name):
647         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
648         p = self._check_writable(inode_parent)
649         p.unlink(name)
650
651     @catch_exceptions
652     def rmdir(self, inode_parent, name):
653         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
654         p = self._check_writable(inode_parent)
655         p.rmdir(name)
656
657     @catch_exceptions
658     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
659         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
660         src = self._check_writable(inode_parent_old)
661         dest = self._check_writable(inode_parent_new)
662         dest.rename(name_old, name_new, src)
663
664     @catch_exceptions
665     def flush(self, fh):
666         if fh in self._filehandles:
667             self._filehandles[fh].flush()
668
669     def fsync(self, fh, datasync):
670         self.flush(fh)
671
672     def fsyncdir(self, fh, datasync):
673         self.flush(fh)