3198: Performance tuning based on benchmarking. Limit number of segments to
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69
70 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
71 from fusefile import StringFile, FuseArvadosFile
72
73 _logger = logging.getLogger('arvados.arvados_fuse')
74
75 # Uncomment this to enable llfuse debug logging.
76 # log_handler = logging.StreamHandler()
77 # llogger = logging.getLogger('llfuse')
78 # llogger.addHandler(log_handler)
79 # llogger.setLevel(logging.DEBUG)
80
81 class Handle(object):
82     """Connects a numeric file handle to a File or Directory object that has
83     been opened by the client."""
84
85     def __init__(self, fh, obj):
86         self.fh = fh
87         self.obj = obj
88         self.obj.inc_use()
89
90     def release(self):
91         self.obj.dec_use()
92
93     def flush(self):
94         return self.obj.flush()
95
96
97 class FileHandle(Handle):
98     """Connects a numeric file handle to a File  object that has
99     been opened by the client."""
100     pass
101
102
103 class DirectoryHandle(Handle):
104     """Connects a numeric file handle to a Directory object that has
105     been opened by the client."""
106
107     def __init__(self, fh, dirobj, entries):
108         super(DirectoryHandle, self).__init__(fh, dirobj)
109         self.entries = entries
110
111
112 class InodeCache(object):
113     """Records the memory footprint of objects and when they are last used.
114
115     When the cache limit is exceeded, the least recently used objects are
116     cleared.  Clearing the object means discarding its contents to release
117     memory.  The next time the object is accessed, it must be re-fetched from
118     the server.  Note that the inode cache limit is a soft limit; the cache
119     limit may be exceeded if necessary to load very large objects, it may also
120     be exceeded if open file handles prevent objects from being cleared.
121
122     """
123
124     def __init__(self, cap, min_entries=4):
125         self._entries = collections.OrderedDict()
126         self._by_uuid = {}
127         self._counter = itertools.count(0)
128         self.cap = cap
129         self._total = 0
130         self.min_entries = min_entries
131
132     def total(self):
133         return self._total
134
135     def _remove(self, obj, clear):
136         if clear and not obj.clear():
137             _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
138             return False
139         self._total -= obj.cache_size
140         del self._entries[obj.cache_priority]
141         if obj.cache_uuid:
142             del self._by_uuid[obj.cache_uuid]
143             obj.cache_uuid = None
144         if clear:
145             _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
146         return True
147
148     def cap_cache(self):
149         if self._total > self.cap:
150             for key in list(self._entries.keys()):
151                 if self._total < self.cap or len(self._entries) < self.min_entries:
152                     break
153                 self._remove(self._entries[key], True)
154
155     def manage(self, obj):
156         if obj.persisted():
157             obj.cache_priority = next(self._counter)
158             obj.cache_size = obj.objsize()
159             self._entries[obj.cache_priority] = obj
160             obj.cache_uuid = obj.uuid()
161             if obj.cache_uuid:
162                 self._by_uuid[obj.cache_uuid] = obj
163             self._total += obj.objsize()
164             _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
165             self.cap_cache()
166         else:
167             obj.cache_priority = None
168
169     def touch(self, obj):
170         if obj.persisted():
171             if obj.cache_priority in self._entries:
172                 self._remove(obj, False)
173             self.manage(obj)
174
175     def unmanage(self, obj):
176         if obj.persisted() and obj.cache_priority in self._entries:
177             self._remove(obj, True)
178
179     def find(self, uuid):
180         return self._by_uuid.get(uuid)
181
182 class Inodes(object):
183     """Manage the set of inodes.  This is the mapping from a numeric id
184     to a concrete File or Directory object"""
185
186     def __init__(self, inode_cache, encoding="utf-8"):
187         self._entries = {}
188         self._counter = itertools.count(llfuse.ROOT_INODE)
189         self.inode_cache = inode_cache
190         self.encoding = encoding
191
192     def __getitem__(self, item):
193         return self._entries[item]
194
195     def __setitem__(self, key, item):
196         self._entries[key] = item
197
198     def __iter__(self):
199         return self._entries.iterkeys()
200
201     def items(self):
202         return self._entries.items()
203
204     def __contains__(self, k):
205         return k in self._entries
206
207     def touch(self, entry):
208         entry._atime = time.time()
209         self.inode_cache.touch(entry)
210
211     def add_entry(self, entry):
212         entry.inode = next(self._counter)
213         if entry.inode == llfuse.ROOT_INODE:
214             entry.inc_ref()
215         self._entries[entry.inode] = entry
216         self.inode_cache.manage(entry)
217         return entry
218
219     def del_entry(self, entry):
220         if entry.ref_count == 0:
221             _logger.debug("Deleting inode %i", entry.inode)
222             self.inode_cache.unmanage(entry)
223             llfuse.invalidate_inode(entry.inode)
224             entry.finalize()
225             del self._entries[entry.inode]
226             entry.inode = None
227         else:
228             entry.dead = True
229             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
230
231
232 def catch_exceptions(orig_func):
233     """Catch uncaught exceptions and log them consistently."""
234
235     @functools.wraps(orig_func)
236     def catch_exceptions_wrapper(self, *args, **kwargs):
237         try:
238             return orig_func(self, *args, **kwargs)
239         except llfuse.FUSEError:
240             raise
241         except EnvironmentError as e:
242             raise llfuse.FUSEError(e.errno)
243         except:
244             _logger.exception("Unhandled exception during FUSE operation")
245             raise llfuse.FUSEError(errno.EIO)
246
247     return catch_exceptions_wrapper
248
249
250 class Operations(llfuse.Operations):
251     """This is the main interface with llfuse.
252
253     The methods on this object are called by llfuse threads to service FUSE
254     events to query and read from the file system.
255
256     llfuse has its own global lock which is acquired before calling a request handler,
257     so request handlers do not run concurrently unless the lock is explicitly released
258     using 'with llfuse.lock_released:'
259
260     """
261
262     def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4):
263         super(Operations, self).__init__()
264
265         if not inode_cache:
266             inode_cache = InodeCache(cap=256*1024*1024)
267         self.inodes = Inodes(inode_cache, encoding=encoding)
268         self.uid = uid
269         self.gid = gid
270
271         # dict of inode to filehandle
272         self._filehandles = {}
273         self._filehandles_counter = itertools.count(0)
274
275         # Other threads that need to wait until the fuse driver
276         # is fully initialized should wait() on this event object.
277         self.initlock = threading.Event()
278
279         self.num_retries = num_retries
280
281         self.events = None
282
283     def init(self):
284         # Allow threads that are waiting for the driver to be finished
285         # initializing to continue
286         self.initlock.set()
287
288     @catch_exceptions
289     def destroy(self):
290         if self.events:
291             self.events.close()
292             self.events = None
293
294         for k,v in self.inodes.items():
295             v.finalize()
296         self.inodes = None
297
298     def access(self, inode, mode, ctx):
299         return True
300
301     def listen_for_events(self, api_client):
302         self.events = arvados.events.subscribe(api_client,
303                                  [["event_type", "in", ["create", "update", "delete"]]],
304                                  self.on_event)
305
306     @catch_exceptions
307     def on_event(self, ev):
308         if 'event_type' in ev:
309             with llfuse.lock:
310                 item = self.inodes.inode_cache.find(ev["object_uuid"])
311                 if item is not None:
312                     item.invalidate()
313                     if ev["object_kind"] == "arvados#collection":
314                         new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
315                         record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
316                         item.update(to_record_version=record_version)
317                     else:
318                         item.update()
319
320                 oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
321                 olditemparent = self.inodes.inode_cache.find(oldowner)
322                 if olditemparent is not None:
323                     olditemparent.invalidate()
324                     olditemparent.update()
325
326                 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
327                 if itemparent is not None:
328                     itemparent.invalidate()
329                     itemparent.update()
330
331     @catch_exceptions
332     def getattr(self, inode):
333         if inode not in self.inodes:
334             raise llfuse.FUSEError(errno.ENOENT)
335
336         e = self.inodes[inode]
337
338         entry = llfuse.EntryAttributes()
339         entry.st_ino = inode
340         entry.generation = 0
341         entry.entry_timeout = 60
342         entry.attr_timeout = 60
343
344         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
345         if isinstance(e, Directory):
346             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
347         else:
348             entry.st_mode |= stat.S_IFREG
349             if isinstance(e, FuseArvadosFile):
350                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
351
352         if e.writable():
353             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
354
355         entry.st_nlink = 1
356         entry.st_uid = self.uid
357         entry.st_gid = self.gid
358         entry.st_rdev = 0
359
360         entry.st_size = e.size()
361
362         entry.st_blksize = 512
363         entry.st_blocks = (entry.st_size/512)+1
364         entry.st_atime = int(e.atime())
365         entry.st_mtime = int(e.mtime())
366         entry.st_ctime = int(e.mtime())
367
368         return entry
369
370     @catch_exceptions
371     def setattr(self, inode, attr):
372         entry = self.getattr(inode)
373
374         e = self.inodes[inode]
375
376         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
377             with llfuse.lock_released:
378                 e.arvfile.truncate(attr.st_size)
379                 entry.st_size = e.arvfile.size()
380
381         return entry
382
383     @catch_exceptions
384     def lookup(self, parent_inode, name):
385         name = unicode(name, self.inodes.encoding)
386         inode = None
387
388         if name == '.':
389             inode = parent_inode
390         else:
391             if parent_inode in self.inodes:
392                 p = self.inodes[parent_inode]
393                 if name == '..':
394                     inode = p.parent_inode
395                 elif isinstance(p, Directory) and name in p:
396                     inode = p[name].inode
397
398         if inode != None:
399             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
400                       parent_inode, name, inode)
401             self.inodes[inode].inc_ref()
402             return self.getattr(inode)
403         else:
404             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
405                       parent_inode, name)
406             raise llfuse.FUSEError(errno.ENOENT)
407
408     @catch_exceptions
409     def forget(self, inodes):
410         for inode, nlookup in inodes:
411             ent = self.inodes[inode]
412             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
413             if ent.dec_ref(nlookup) == 0 and ent.dead:
414                 self.inodes.del_entry(ent)
415
416     @catch_exceptions
417     def open(self, inode, flags):
418         if inode in self.inodes:
419             p = self.inodes[inode]
420         else:
421             raise llfuse.FUSEError(errno.ENOENT)
422
423         if isinstance(p, Directory):
424             raise llfuse.FUSEError(errno.EISDIR)
425
426         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
427             raise llfuse.FUSEError(errno.EPERM)
428
429         fh = next(self._filehandles_counter)
430         self._filehandles[fh] = FileHandle(fh, p)
431         self.inodes.touch(p)
432         return fh
433
434     @catch_exceptions
435     def read(self, fh, off, size):
436         _logger.debug("arv-mount read %i %i %i", fh, off, size)
437         if fh in self._filehandles:
438             handle = self._filehandles[fh]
439         else:
440             raise llfuse.FUSEError(errno.EBADF)
441
442         self.inodes.touch(handle.obj)
443
444         try:
445             return handle.obj.readfrom(off, size, self.num_retries)
446         except arvados.errors.NotFoundError as e:
447             _logger.error("Block not found: " + str(e))
448             raise llfuse.FUSEError(errno.EIO)
449
450     @catch_exceptions
451     def write(self, fh, off, buf):
452         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
453         if fh in self._filehandles:
454             handle = self._filehandles[fh]
455         else:
456             raise llfuse.FUSEError(errno.EBADF)
457
458         if not handle.obj.writable():
459             raise llfuse.FUSEError(errno.EPERM)
460
461         self.inodes.touch(handle.obj)
462
463         return handle.obj.writeto(off, buf, self.num_retries)
464
465     @catch_exceptions
466     def release(self, fh):
467         if fh in self._filehandles:
468             try:
469                 self._filehandles[fh].flush()
470             except EnvironmentError as e:
471                 raise llfuse.FUSEError(e.errno)
472             except Exception:
473                 _logger.exception("Flush error")
474             self._filehandles[fh].release()
475             del self._filehandles[fh]
476         self.inodes.inode_cache.cap_cache()
477
478     def releasedir(self, fh):
479         self.release(fh)
480
481     @catch_exceptions
482     def opendir(self, inode):
483         _logger.debug("arv-mount opendir: inode %i", inode)
484
485         if inode in self.inodes:
486             p = self.inodes[inode]
487         else:
488             raise llfuse.FUSEError(errno.ENOENT)
489
490         if not isinstance(p, Directory):
491             raise llfuse.FUSEError(errno.ENOTDIR)
492
493         fh = next(self._filehandles_counter)
494         if p.parent_inode in self.inodes:
495             parent = self.inodes[p.parent_inode]
496         else:
497             raise llfuse.FUSEError(errno.EIO)
498
499         # update atime
500         self.inodes.touch(p)
501
502         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
503         return fh
504
505     @catch_exceptions
506     def readdir(self, fh, off):
507         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
508
509         if fh in self._filehandles:
510             handle = self._filehandles[fh]
511         else:
512             raise llfuse.FUSEError(errno.EBADF)
513
514         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
515
516         e = off
517         while e < len(handle.entries):
518             if handle.entries[e][1].inode in self.inodes:
519                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
520             e += 1
521
522     @catch_exceptions
523     def statfs(self):
524         st = llfuse.StatvfsData()
525         st.f_bsize = 128 * 1024
526         st.f_blocks = 0
527         st.f_files = 0
528
529         st.f_bfree = 0
530         st.f_bavail = 0
531
532         st.f_ffree = 0
533         st.f_favail = 0
534
535         st.f_frsize = 0
536         return st
537
538     def _check_writable(self, inode_parent):
539         if inode_parent in self.inodes:
540             p = self.inodes[inode_parent]
541         else:
542             raise llfuse.FUSEError(errno.ENOENT)
543
544         if not isinstance(p, Directory):
545             raise llfuse.FUSEError(errno.ENOTDIR)
546
547         if not p.writable():
548             raise llfuse.FUSEError(errno.EPERM)
549
550         return p
551
552     @catch_exceptions
553     def create(self, inode_parent, name, mode, flags, ctx):
554         p = self._check_writable(inode_parent)
555         p.create(name)
556
557         # The file entry should have been implicitly created by callback.
558         f = p[name]
559         fh = next(self._filehandles_counter)
560         self._filehandles[fh] = FileHandle(fh, f)
561         self.inodes.touch(p)
562
563         f.inc_ref()
564         return (fh, self.getattr(f.inode))
565
566     @catch_exceptions
567     def mkdir(self, inode_parent, name, mode, ctx):
568         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
569
570         p = self._check_writable(inode_parent)
571         p.mkdir(name)
572
573         # The dir entry should have been implicitly created by callback.
574         d = p[name]
575
576         d.inc_ref()
577         return self.getattr(d.inode)
578
579     @catch_exceptions
580     def unlink(self, inode_parent, name):
581         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
582         p = self._check_writable(inode_parent)
583         p.unlink(name)
584
585     @catch_exceptions
586     def rmdir(self, inode_parent, name):
587         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
588         p = self._check_writable(inode_parent)
589         p.rmdir(name)
590
591     @catch_exceptions
592     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
593         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
594         src = self._check_writable(inode_parent_old)
595         dest = self._check_writable(inode_parent_new)
596         dest.rename(name_old, name_new, src)
597
598     @catch_exceptions
599     def flush(self, fh):
600         if fh in self._filehandles:
601             self._filehandles[fh].flush()
602
603     def fsync(self, fh, datasync):
604         self.flush(fh)
605
606     def fsyncdir(self, fh, datasync):
607         self.flush(fh)