Merge branch '8784-dir-listings'
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 """FUSE driver for Arvados Keep
6
7 Architecture:
8
9 There is one `Operations` object per mount point.  It is the entry point for all
10 read and write requests from the llfuse module.
11
12 The operations object owns an `Inodes` object.  The inodes object stores the
13 mapping from numeric inode (used throughout the file system API to uniquely
14 identify files) to the Python objects that implement files and directories.
15
16 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
17 memory footprint of file system objects and when they are last used.  When the
18 cache limit is exceeded, the least recently used objects are cleared.
19
20 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
21
22 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
23 which implement actual reads and writes.
24
25 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
26 a Python dict which stores the mapping from filenames to directory entries.
27 Directory contents can be accessed through the Python operators such as `[]`
28 and `in`.  These methods automatically check if the directory is fresh (up to
29 date) or stale (needs update) and will call `update` if necessary before
30 returing a result.
31
32 The general FUSE operation flow is as follows:
33
34 - The request handler is called with either an inode or file handle that is the
35   subject of the operation.
36
37 - Look up the inode using the Inodes table or the file handle in the
38   filehandles table to get the file system object.
39
40 - For methods that alter files or directories, check that the operation is
41   valid and permitted using _check_writable().
42
43 - Call the relevant method on the file system object.
44
45 - Return the result.
46
47 The FUSE driver supports the Arvados event bus.  When an event is received for
48 an object that is live in the inode cache, that object is immediately updated.
49
50 """
51
52 import os
53 import sys
54 import llfuse
55 import errno
56 import stat
57 import threading
58 import arvados
59 import pprint
60 import arvados.events
61 import re
62 import apiclient
63 import json
64 import logging
65 import time
66 import _strptime
67 import calendar
68 import threading
69 import itertools
70 import ciso8601
71 import collections
72 import functools
73 import arvados.keep
74
75 import Queue
76
77 # Default _notify_queue has a limit of 1000 items, but it really needs to be
78 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
79 # details.
80
81 if hasattr(llfuse, 'capi'):
82     # llfuse < 0.42
83     llfuse.capi._notify_queue = Queue.Queue()
84 else:
85     # llfuse >= 0.42
86     llfuse._notify_queue = Queue.Queue()
87
88 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
89 from fusefile import StringFile, FuseArvadosFile
90
91 _logger = logging.getLogger('arvados.arvados_fuse')
92
93 # Uncomment this to enable llfuse debug logging.
94 # log_handler = logging.StreamHandler()
95 # llogger = logging.getLogger('llfuse')
96 # llogger.addHandler(log_handler)
97 # llogger.setLevel(logging.DEBUG)
98
99 class Handle(object):
100     """Connects a numeric file handle to a File or Directory object that has
101     been opened by the client."""
102
103     def __init__(self, fh, obj):
104         self.fh = fh
105         self.obj = obj
106         self.obj.inc_use()
107
108     def release(self):
109         self.obj.dec_use()
110
111     def flush(self):
112         pass
113
114
115 class FileHandle(Handle):
116     """Connects a numeric file handle to a File  object that has
117     been opened by the client."""
118
119     def flush(self):
120         if self.obj.writable():
121             return self.obj.flush()
122
123
124 class DirectoryHandle(Handle):
125     """Connects a numeric file handle to a Directory object that has
126     been opened by the client."""
127
128     def __init__(self, fh, dirobj, entries):
129         super(DirectoryHandle, self).__init__(fh, dirobj)
130         self.entries = entries
131
132
133 class InodeCache(object):
134     """Records the memory footprint of objects and when they are last used.
135
136     When the cache limit is exceeded, the least recently used objects are
137     cleared.  Clearing the object means discarding its contents to release
138     memory.  The next time the object is accessed, it must be re-fetched from
139     the server.  Note that the inode cache limit is a soft limit; the cache
140     limit may be exceeded if necessary to load very large objects, it may also
141     be exceeded if open file handles prevent objects from being cleared.
142
143     """
144
145     def __init__(self, cap, min_entries=4):
146         self._entries = collections.OrderedDict()
147         self._by_uuid = {}
148         self.cap = cap
149         self._total = 0
150         self.min_entries = min_entries
151
152     def total(self):
153         return self._total
154
155     def _remove(self, obj, clear):
156         if clear:
157             if obj.in_use():
158                 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
159                 return
160             if obj.has_ref(True):
161                 obj.kernel_invalidate()
162                 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
163                 return
164             obj.clear()
165
166         # The llfuse lock is released in del_entry(), which is called by
167         # Directory.clear().  While the llfuse lock is released, it can happen
168         # that a reentrant call removes this entry before this call gets to it.
169         # Ensure that the entry is still valid before trying to remove it.
170         if obj.inode not in self._entries:
171             return
172
173         self._total -= obj.cache_size
174         del self._entries[obj.inode]
175         if obj.cache_uuid:
176             self._by_uuid[obj.cache_uuid].remove(obj)
177             if not self._by_uuid[obj.cache_uuid]:
178                 del self._by_uuid[obj.cache_uuid]
179             obj.cache_uuid = None
180         if clear:
181             _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
182
183     def cap_cache(self):
184         if self._total > self.cap:
185             for ent in self._entries.values():
186                 if self._total < self.cap or len(self._entries) < self.min_entries:
187                     break
188                 self._remove(ent, True)
189
190     def manage(self, obj):
191         if obj.persisted():
192             obj.cache_size = obj.objsize()
193             self._entries[obj.inode] = obj
194             obj.cache_uuid = obj.uuid()
195             if obj.cache_uuid:
196                 if obj.cache_uuid not in self._by_uuid:
197                     self._by_uuid[obj.cache_uuid] = [obj]
198                 else:
199                     if obj not in self._by_uuid[obj.cache_uuid]:
200                         self._by_uuid[obj.cache_uuid].append(obj)
201             self._total += obj.objsize()
202             _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
203             self.cap_cache()
204
205     def touch(self, obj):
206         if obj.persisted():
207             if obj.inode in self._entries:
208                 self._remove(obj, False)
209             self.manage(obj)
210
211     def unmanage(self, obj):
212         if obj.persisted() and obj.inode in self._entries:
213             self._remove(obj, True)
214
215     def find_by_uuid(self, uuid):
216         return self._by_uuid.get(uuid, [])
217
218     def clear(self):
219         self._entries.clear()
220         self._by_uuid.clear()
221         self._total = 0
222
223 class Inodes(object):
224     """Manage the set of inodes.  This is the mapping from a numeric id
225     to a concrete File or Directory object"""
226
227     def __init__(self, inode_cache, encoding="utf-8"):
228         self._entries = {}
229         self._counter = itertools.count(llfuse.ROOT_INODE)
230         self.inode_cache = inode_cache
231         self.encoding = encoding
232         self.deferred_invalidations = []
233
234     def __getitem__(self, item):
235         return self._entries[item]
236
237     def __setitem__(self, key, item):
238         self._entries[key] = item
239
240     def __iter__(self):
241         return self._entries.iterkeys()
242
243     def items(self):
244         return self._entries.items()
245
246     def __contains__(self, k):
247         return k in self._entries
248
249     def touch(self, entry):
250         entry._atime = time.time()
251         self.inode_cache.touch(entry)
252
253     def add_entry(self, entry):
254         entry.inode = next(self._counter)
255         if entry.inode == llfuse.ROOT_INODE:
256             entry.inc_ref()
257         self._entries[entry.inode] = entry
258         self.inode_cache.manage(entry)
259         return entry
260
261     def del_entry(self, entry):
262         if entry.ref_count == 0:
263             self.inode_cache.unmanage(entry)
264             del self._entries[entry.inode]
265             with llfuse.lock_released:
266                 entry.finalize()
267             self.invalidate_inode(entry.inode)
268             entry.inode = None
269         else:
270             entry.dead = True
271             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
272
273     def invalidate_inode(self, inode):
274         llfuse.invalidate_inode(inode)
275
276     def invalidate_entry(self, inode, name):
277         llfuse.invalidate_entry(inode, name.encode(self.encoding))
278
279     def clear(self):
280         self.inode_cache.clear()
281
282         for k,v in self._entries.items():
283             try:
284                 v.finalize()
285             except Exception as e:
286                 _logger.exception("Error during finalize of inode %i", k)
287
288         self._entries.clear()
289
290
291 def catch_exceptions(orig_func):
292     """Catch uncaught exceptions and log them consistently."""
293
294     @functools.wraps(orig_func)
295     def catch_exceptions_wrapper(self, *args, **kwargs):
296         try:
297             return orig_func(self, *args, **kwargs)
298         except llfuse.FUSEError:
299             raise
300         except EnvironmentError as e:
301             raise llfuse.FUSEError(e.errno)
302         except arvados.errors.KeepWriteError as e:
303             _logger.error("Keep write error: " + str(e))
304             raise llfuse.FUSEError(errno.EIO)
305         except arvados.errors.NotFoundError as e:
306             _logger.error("Block not found error: " + str(e))
307             raise llfuse.FUSEError(errno.EIO)
308         except:
309             _logger.exception("Unhandled exception during FUSE operation")
310             raise llfuse.FUSEError(errno.EIO)
311
312     return catch_exceptions_wrapper
313
314
315 class Operations(llfuse.Operations):
316     """This is the main interface with llfuse.
317
318     The methods on this object are called by llfuse threads to service FUSE
319     events to query and read from the file system.
320
321     llfuse has its own global lock which is acquired before calling a request handler,
322     so request handlers do not run concurrently unless the lock is explicitly released
323     using 'with llfuse.lock_released:'
324
325     """
326
327     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
328         super(Operations, self).__init__()
329
330         self._api_client = api_client
331
332         if not inode_cache:
333             inode_cache = InodeCache(cap=256*1024*1024)
334         self.inodes = Inodes(inode_cache, encoding=encoding)
335         self.uid = uid
336         self.gid = gid
337         self.enable_write = enable_write
338
339         # dict of inode to filehandle
340         self._filehandles = {}
341         self._filehandles_counter = itertools.count(0)
342
343         # Other threads that need to wait until the fuse driver
344         # is fully initialized should wait() on this event object.
345         self.initlock = threading.Event()
346
347         # If we get overlapping shutdown events (e.g., fusermount -u
348         # -z and operations.destroy()) llfuse calls forget() on inodes
349         # that have already been deleted. To avoid this, we make
350         # forget() a no-op if called after destroy().
351         self._shutdown_started = threading.Event()
352
353         self.num_retries = num_retries
354
355         self.read_counter = arvados.keep.Counter()
356         self.write_counter = arvados.keep.Counter()
357         self.read_ops_counter = arvados.keep.Counter()
358         self.write_ops_counter = arvados.keep.Counter()
359
360         self.events = None
361
362     def init(self):
363         # Allow threads that are waiting for the driver to be finished
364         # initializing to continue
365         self.initlock.set()
366
367     @catch_exceptions
368     def destroy(self):
369         self._shutdown_started.set()
370         if self.events:
371             self.events.close()
372             self.events = None
373
374         if llfuse.lock.acquire():
375             # llfuse < 0.42
376             self.inodes.clear()
377             llfuse.lock.release()
378         else:
379             # llfuse >= 0.42
380             self.inodes.clear()
381
382     def access(self, inode, mode, ctx):
383         return True
384
385     def listen_for_events(self):
386         self.events = arvados.events.subscribe(
387             self._api_client,
388             [["event_type", "in", ["create", "update", "delete"]]],
389             self.on_event)
390
391     @catch_exceptions
392     def on_event(self, ev):
393         if 'event_type' not in ev:
394             return
395         with llfuse.lock:
396             new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
397             pdh = new_attrs.get("portable_data_hash")
398             # new_attributes.modified_at currently lacks
399             # subsecond precision (see #6347) so use event_at
400             # which should always be the same.
401             stamp = ev.get("event_at")
402
403             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
404                 item.invalidate()
405                 if stamp and pdh and ev.get("object_kind") == "arvados#collection":
406                     item.update(to_record_version=(stamp, pdh))
407                 else:
408                     item.update()
409
410             oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
411             newowner = ev.get("object_owner_uuid")
412             for parent in (
413                     self.inodes.inode_cache.find_by_uuid(oldowner) +
414                     self.inodes.inode_cache.find_by_uuid(newowner)):
415                 parent.invalidate()
416                 parent.update()
417
418     @catch_exceptions
419     def getattr(self, inode, ctx=None):
420         if inode not in self.inodes:
421             raise llfuse.FUSEError(errno.ENOENT)
422
423         e = self.inodes[inode]
424
425         entry = llfuse.EntryAttributes()
426         entry.st_ino = inode
427         entry.generation = 0
428         entry.entry_timeout = 60 if e.allow_dirent_cache else 0
429         entry.attr_timeout = 60 if e.allow_attr_cache else 0
430
431         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
432         if isinstance(e, Directory):
433             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
434         else:
435             entry.st_mode |= stat.S_IFREG
436             if isinstance(e, FuseArvadosFile):
437                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
438
439         if self.enable_write and e.writable():
440             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
441
442         entry.st_nlink = 1
443         entry.st_uid = self.uid
444         entry.st_gid = self.gid
445         entry.st_rdev = 0
446
447         entry.st_size = e.size()
448
449         entry.st_blksize = 512
450         entry.st_blocks = (entry.st_size/512)+1
451         if hasattr(entry, 'st_atime_ns'):
452             # llfuse >= 0.42
453             entry.st_atime_ns = int(e.atime() * 1000000000)
454             entry.st_mtime_ns = int(e.mtime() * 1000000000)
455             entry.st_ctime_ns = int(e.mtime() * 1000000000)
456         else:
457             # llfuse < 0.42
458             entry.st_atime = int(e.atime)
459             entry.st_mtime = int(e.mtime)
460             entry.st_ctime = int(e.mtime)
461
462         return entry
463
464     @catch_exceptions
465     def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
466         entry = self.getattr(inode)
467
468         if fh is not None and fh in self._filehandles:
469             handle = self._filehandles[fh]
470             e = handle.obj
471         else:
472             e = self.inodes[inode]
473
474         if fields is None:
475             # llfuse < 0.42
476             update_size = attr.st_size is not None
477         else:
478             # llfuse >= 0.42
479             update_size = fields.update_size
480         if update_size and isinstance(e, FuseArvadosFile):
481             with llfuse.lock_released:
482                 e.arvfile.truncate(attr.st_size)
483                 entry.st_size = e.arvfile.size()
484
485         return entry
486
487     @catch_exceptions
488     def lookup(self, parent_inode, name, ctx=None):
489         name = unicode(name, self.inodes.encoding)
490         inode = None
491
492         if name == '.':
493             inode = parent_inode
494         else:
495             if parent_inode in self.inodes:
496                 p = self.inodes[parent_inode]
497                 self.inodes.touch(p)
498                 if name == '..':
499                     inode = p.parent_inode
500                 elif isinstance(p, Directory) and name in p:
501                     inode = p[name].inode
502
503         if inode != None:
504             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
505                       parent_inode, name, inode)
506             self.inodes[inode].inc_ref()
507             return self.getattr(inode)
508         else:
509             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
510                       parent_inode, name)
511             raise llfuse.FUSEError(errno.ENOENT)
512
513     @catch_exceptions
514     def forget(self, inodes):
515         if self._shutdown_started.is_set():
516             return
517         for inode, nlookup in inodes:
518             ent = self.inodes[inode]
519             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
520             if ent.dec_ref(nlookup) == 0 and ent.dead:
521                 self.inodes.del_entry(ent)
522
523     @catch_exceptions
524     def open(self, inode, flags, ctx=None):
525         if inode in self.inodes:
526             p = self.inodes[inode]
527         else:
528             raise llfuse.FUSEError(errno.ENOENT)
529
530         if isinstance(p, Directory):
531             raise llfuse.FUSEError(errno.EISDIR)
532
533         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
534             raise llfuse.FUSEError(errno.EPERM)
535
536         fh = next(self._filehandles_counter)
537         self._filehandles[fh] = FileHandle(fh, p)
538         self.inodes.touch(p)
539
540         # Normally, we will have received an "update" event if the
541         # parent collection is stale here. However, even if the parent
542         # collection hasn't changed, the manifest might have been
543         # fetched so long ago that the signatures on the data block
544         # locators have expired. Calling checkupdate() on all
545         # ancestors ensures the signatures will be refreshed if
546         # necessary.
547         while p.parent_inode in self.inodes:
548             if p == self.inodes[p.parent_inode]:
549                 break
550             p = self.inodes[p.parent_inode]
551             self.inodes.touch(p)
552             p.checkupdate()
553
554         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
555
556         return fh
557
558     @catch_exceptions
559     def read(self, fh, off, size):
560         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
561         self.read_ops_counter.add(1)
562
563         if fh in self._filehandles:
564             handle = self._filehandles[fh]
565         else:
566             raise llfuse.FUSEError(errno.EBADF)
567
568         self.inodes.touch(handle.obj)
569
570         r = handle.obj.readfrom(off, size, self.num_retries)
571         if r:
572             self.read_counter.add(len(r))
573         return r
574
575     @catch_exceptions
576     def write(self, fh, off, buf):
577         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
578         self.write_ops_counter.add(1)
579
580         if fh in self._filehandles:
581             handle = self._filehandles[fh]
582         else:
583             raise llfuse.FUSEError(errno.EBADF)
584
585         if not handle.obj.writable():
586             raise llfuse.FUSEError(errno.EPERM)
587
588         self.inodes.touch(handle.obj)
589
590         w = handle.obj.writeto(off, buf, self.num_retries)
591         if w:
592             self.write_counter.add(w)
593         return w
594
595     @catch_exceptions
596     def release(self, fh):
597         if fh in self._filehandles:
598             _logger.debug("arv-mount release fh %i", fh)
599             try:
600                 self._filehandles[fh].flush()
601             except Exception:
602                 raise
603             finally:
604                 self._filehandles[fh].release()
605                 del self._filehandles[fh]
606         self.inodes.inode_cache.cap_cache()
607
608     def releasedir(self, fh):
609         self.release(fh)
610
611     @catch_exceptions
612     def opendir(self, inode, ctx=None):
613         _logger.debug("arv-mount opendir: inode %i", inode)
614
615         if inode in self.inodes:
616             p = self.inodes[inode]
617         else:
618             raise llfuse.FUSEError(errno.ENOENT)
619
620         if not isinstance(p, Directory):
621             raise llfuse.FUSEError(errno.ENOTDIR)
622
623         fh = next(self._filehandles_counter)
624         if p.parent_inode in self.inodes:
625             parent = self.inodes[p.parent_inode]
626         else:
627             raise llfuse.FUSEError(errno.EIO)
628
629         # update atime
630         self.inodes.touch(p)
631
632         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
633         return fh
634
635     @catch_exceptions
636     def readdir(self, fh, off):
637         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
638
639         if fh in self._filehandles:
640             handle = self._filehandles[fh]
641         else:
642             raise llfuse.FUSEError(errno.EBADF)
643
644         e = off
645         while e < len(handle.entries):
646             if handle.entries[e][1].inode in self.inodes:
647                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
648             e += 1
649
650     @catch_exceptions
651     def statfs(self, ctx=None):
652         st = llfuse.StatvfsData()
653         st.f_bsize = 128 * 1024
654         st.f_blocks = 0
655         st.f_files = 0
656
657         st.f_bfree = 0
658         st.f_bavail = 0
659
660         st.f_ffree = 0
661         st.f_favail = 0
662
663         st.f_frsize = 0
664         return st
665
666     def _check_writable(self, inode_parent):
667         if not self.enable_write:
668             raise llfuse.FUSEError(errno.EROFS)
669
670         if inode_parent in self.inodes:
671             p = self.inodes[inode_parent]
672         else:
673             raise llfuse.FUSEError(errno.ENOENT)
674
675         if not isinstance(p, Directory):
676             raise llfuse.FUSEError(errno.ENOTDIR)
677
678         if not p.writable():
679             raise llfuse.FUSEError(errno.EPERM)
680
681         return p
682
683     @catch_exceptions
684     def create(self, inode_parent, name, mode, flags, ctx=None):
685         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
686
687         p = self._check_writable(inode_parent)
688         p.create(name)
689
690         # The file entry should have been implicitly created by callback.
691         f = p[name]
692         fh = next(self._filehandles_counter)
693         self._filehandles[fh] = FileHandle(fh, f)
694         self.inodes.touch(p)
695
696         f.inc_ref()
697         return (fh, self.getattr(f.inode))
698
699     @catch_exceptions
700     def mkdir(self, inode_parent, name, mode, ctx=None):
701         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
702
703         p = self._check_writable(inode_parent)
704         p.mkdir(name)
705
706         # The dir entry should have been implicitly created by callback.
707         d = p[name]
708
709         d.inc_ref()
710         return self.getattr(d.inode)
711
712     @catch_exceptions
713     def unlink(self, inode_parent, name, ctx=None):
714         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
715         p = self._check_writable(inode_parent)
716         p.unlink(name)
717
718     @catch_exceptions
719     def rmdir(self, inode_parent, name, ctx=None):
720         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
721         p = self._check_writable(inode_parent)
722         p.rmdir(name)
723
724     @catch_exceptions
725     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
726         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
727         src = self._check_writable(inode_parent_old)
728         dest = self._check_writable(inode_parent_new)
729         dest.rename(name_old, name_new, src)
730
731     @catch_exceptions
732     def flush(self, fh):
733         if fh in self._filehandles:
734             self._filehandles[fh].flush()
735
736     def fsync(self, fh, datasync):
737         self.flush(fh)
738
739     def fsyncdir(self, fh, datasync):
740         self.flush(fh)