Merge branch '11332-fix-crunchscript' refs #11332
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69 import arvados.keep
70
71 import Queue
72
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
75 # details.
76
77 if hasattr(llfuse, 'capi'):
78     # llfuse < 0.42
79     llfuse.capi._notify_queue = Queue.Queue()
80 else:
81     # llfuse >= 0.42
82     llfuse._notify_queue = Queue.Queue()
83
84 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
85 from fusefile import StringFile, FuseArvadosFile
86
87 _logger = logging.getLogger('arvados.arvados_fuse')
88
89 # Uncomment this to enable llfuse debug logging.
90 # log_handler = logging.StreamHandler()
91 # llogger = logging.getLogger('llfuse')
92 # llogger.addHandler(log_handler)
93 # llogger.setLevel(logging.DEBUG)
94
95 class Handle(object):
96     """Connects a numeric file handle to a File or Directory object that has
97     been opened by the client."""
98
99     def __init__(self, fh, obj):
100         self.fh = fh
101         self.obj = obj
102         self.obj.inc_use()
103
104     def release(self):
105         self.obj.dec_use()
106
107     def flush(self):
108         pass
109
110
111 class FileHandle(Handle):
112     """Connects a numeric file handle to a File  object that has
113     been opened by the client."""
114
115     def flush(self):
116         if self.obj.writable():
117             return self.obj.flush()
118
119
120 class DirectoryHandle(Handle):
121     """Connects a numeric file handle to a Directory object that has
122     been opened by the client."""
123
124     def __init__(self, fh, dirobj, entries):
125         super(DirectoryHandle, self).__init__(fh, dirobj)
126         self.entries = entries
127
128
129 class InodeCache(object):
130     """Records the memory footprint of objects and when they are last used.
131
132     When the cache limit is exceeded, the least recently used objects are
133     cleared.  Clearing the object means discarding its contents to release
134     memory.  The next time the object is accessed, it must be re-fetched from
135     the server.  Note that the inode cache limit is a soft limit; the cache
136     limit may be exceeded if necessary to load very large objects, it may also
137     be exceeded if open file handles prevent objects from being cleared.
138
139     """
140
141     def __init__(self, cap, min_entries=4):
142         self._entries = collections.OrderedDict()
143         self._by_uuid = {}
144         self.cap = cap
145         self._total = 0
146         self.min_entries = min_entries
147
148     def total(self):
149         return self._total
150
151     def _remove(self, obj, clear):
152         if clear:
153             if obj.in_use():
154                 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
155                 return
156             if obj.has_ref(True):
157                 obj.kernel_invalidate()
158                 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
159                 return
160             obj.clear()
161
162         # The llfuse lock is released in del_entry(), which is called by
163         # Directory.clear().  While the llfuse lock is released, it can happen
164         # that a reentrant call removes this entry before this call gets to it.
165         # Ensure that the entry is still valid before trying to remove it.
166         if obj.inode not in self._entries:
167             return
168
169         self._total -= obj.cache_size
170         del self._entries[obj.inode]
171         if obj.cache_uuid:
172             self._by_uuid[obj.cache_uuid].remove(obj)
173             if not self._by_uuid[obj.cache_uuid]:
174                 del self._by_uuid[obj.cache_uuid]
175             obj.cache_uuid = None
176         if clear:
177             _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
178
179     def cap_cache(self):
180         if self._total > self.cap:
181             for ent in self._entries.values():
182                 if self._total < self.cap or len(self._entries) < self.min_entries:
183                     break
184                 self._remove(ent, True)
185
186     def manage(self, obj):
187         if obj.persisted():
188             obj.cache_size = obj.objsize()
189             self._entries[obj.inode] = obj
190             obj.cache_uuid = obj.uuid()
191             if obj.cache_uuid:
192                 if obj.cache_uuid not in self._by_uuid:
193                     self._by_uuid[obj.cache_uuid] = [obj]
194                 else:
195                     if obj not in self._by_uuid[obj.cache_uuid]:
196                         self._by_uuid[obj.cache_uuid].append(obj)
197             self._total += obj.objsize()
198             _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
199             self.cap_cache()
200
201     def touch(self, obj):
202         if obj.persisted():
203             if obj.inode in self._entries:
204                 self._remove(obj, False)
205             self.manage(obj)
206
207     def unmanage(self, obj):
208         if obj.persisted() and obj.inode in self._entries:
209             self._remove(obj, True)
210
211     def find_by_uuid(self, uuid):
212         return self._by_uuid.get(uuid, [])
213
214     def clear(self):
215         self._entries.clear()
216         self._by_uuid.clear()
217         self._total = 0
218
219 class Inodes(object):
220     """Manage the set of inodes.  This is the mapping from a numeric id
221     to a concrete File or Directory object"""
222
223     def __init__(self, inode_cache, encoding="utf-8"):
224         self._entries = {}
225         self._counter = itertools.count(llfuse.ROOT_INODE)
226         self.inode_cache = inode_cache
227         self.encoding = encoding
228         self.deferred_invalidations = []
229
230     def __getitem__(self, item):
231         return self._entries[item]
232
233     def __setitem__(self, key, item):
234         self._entries[key] = item
235
236     def __iter__(self):
237         return self._entries.iterkeys()
238
239     def items(self):
240         return self._entries.items()
241
242     def __contains__(self, k):
243         return k in self._entries
244
245     def touch(self, entry):
246         entry._atime = time.time()
247         self.inode_cache.touch(entry)
248
249     def add_entry(self, entry):
250         entry.inode = next(self._counter)
251         if entry.inode == llfuse.ROOT_INODE:
252             entry.inc_ref()
253         self._entries[entry.inode] = entry
254         self.inode_cache.manage(entry)
255         return entry
256
257     def del_entry(self, entry):
258         if entry.ref_count == 0:
259             self.inode_cache.unmanage(entry)
260             del self._entries[entry.inode]
261             with llfuse.lock_released:
262                 entry.finalize()
263             self.invalidate_inode(entry.inode)
264             entry.inode = None
265         else:
266             entry.dead = True
267             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
268
269     def invalidate_inode(self, inode):
270         llfuse.invalidate_inode(inode)
271
272     def invalidate_entry(self, inode, name):
273         llfuse.invalidate_entry(inode, name.encode(self.encoding))
274
275     def clear(self):
276         self.inode_cache.clear()
277
278         for k,v in self._entries.items():
279             try:
280                 v.finalize()
281             except Exception as e:
282                 _logger.exception("Error during finalize of inode %i", k)
283
284         self._entries.clear()
285
286
287 def catch_exceptions(orig_func):
288     """Catch uncaught exceptions and log them consistently."""
289
290     @functools.wraps(orig_func)
291     def catch_exceptions_wrapper(self, *args, **kwargs):
292         try:
293             return orig_func(self, *args, **kwargs)
294         except llfuse.FUSEError:
295             raise
296         except EnvironmentError as e:
297             raise llfuse.FUSEError(e.errno)
298         except arvados.errors.KeepWriteError as e:
299             _logger.error("Keep write error: " + str(e))
300             raise llfuse.FUSEError(errno.EIO)
301         except arvados.errors.NotFoundError as e:
302             _logger.error("Block not found error: " + str(e))
303             raise llfuse.FUSEError(errno.EIO)
304         except:
305             _logger.exception("Unhandled exception during FUSE operation")
306             raise llfuse.FUSEError(errno.EIO)
307
308     return catch_exceptions_wrapper
309
310
311 class Operations(llfuse.Operations):
312     """This is the main interface with llfuse.
313
314     The methods on this object are called by llfuse threads to service FUSE
315     events to query and read from the file system.
316
317     llfuse has its own global lock which is acquired before calling a request handler,
318     so request handlers do not run concurrently unless the lock is explicitly released
319     using 'with llfuse.lock_released:'
320
321     """
322
323     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
324         super(Operations, self).__init__()
325
326         self._api_client = api_client
327
328         if not inode_cache:
329             inode_cache = InodeCache(cap=256*1024*1024)
330         self.inodes = Inodes(inode_cache, encoding=encoding)
331         self.uid = uid
332         self.gid = gid
333         self.enable_write = enable_write
334
335         # dict of inode to filehandle
336         self._filehandles = {}
337         self._filehandles_counter = itertools.count(0)
338
339         # Other threads that need to wait until the fuse driver
340         # is fully initialized should wait() on this event object.
341         self.initlock = threading.Event()
342
343         # If we get overlapping shutdown events (e.g., fusermount -u
344         # -z and operations.destroy()) llfuse calls forget() on inodes
345         # that have already been deleted. To avoid this, we make
346         # forget() a no-op if called after destroy().
347         self._shutdown_started = threading.Event()
348
349         self.num_retries = num_retries
350
351         self.read_counter = arvados.keep.Counter()
352         self.write_counter = arvados.keep.Counter()
353         self.read_ops_counter = arvados.keep.Counter()
354         self.write_ops_counter = arvados.keep.Counter()
355
356         self.events = None
357
358     def init(self):
359         # Allow threads that are waiting for the driver to be finished
360         # initializing to continue
361         self.initlock.set()
362
363     @catch_exceptions
364     def destroy(self):
365         self._shutdown_started.set()
366         if self.events:
367             self.events.close()
368             self.events = None
369
370         if llfuse.lock.acquire():
371             # llfuse < 0.42
372             self.inodes.clear()
373             llfuse.lock.release()
374         else:
375             # llfuse >= 0.42
376             self.inodes.clear()
377
378     def access(self, inode, mode, ctx):
379         return True
380
381     def listen_for_events(self):
382         self.events = arvados.events.subscribe(
383             self._api_client,
384             [["event_type", "in", ["create", "update", "delete"]]],
385             self.on_event)
386
387     @catch_exceptions
388     def on_event(self, ev):
389         if 'event_type' not in ev:
390             return
391         with llfuse.lock:
392             new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
393             pdh = new_attrs.get("portable_data_hash")
394             # new_attributes.modified_at currently lacks
395             # subsecond precision (see #6347) so use event_at
396             # which should always be the same.
397             stamp = ev.get("event_at")
398
399             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
400                 item.invalidate()
401                 if stamp and pdh and ev.get("object_kind") == "arvados#collection":
402                     item.update(to_record_version=(stamp, pdh))
403                 else:
404                     item.update()
405
406             oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
407             newowner = ev.get("object_owner_uuid")
408             for parent in (
409                     self.inodes.inode_cache.find_by_uuid(oldowner) +
410                     self.inodes.inode_cache.find_by_uuid(newowner)):
411                 parent.invalidate()
412                 parent.update()
413
414     @catch_exceptions
415     def getattr(self, inode, ctx=None):
416         if inode not in self.inodes:
417             raise llfuse.FUSEError(errno.ENOENT)
418
419         e = self.inodes[inode]
420
421         entry = llfuse.EntryAttributes()
422         entry.st_ino = inode
423         entry.generation = 0
424         entry.entry_timeout = 60 if e.allow_dirent_cache else 0
425         entry.attr_timeout = 60 if e.allow_attr_cache else 0
426
427         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
428         if isinstance(e, Directory):
429             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
430         else:
431             entry.st_mode |= stat.S_IFREG
432             if isinstance(e, FuseArvadosFile):
433                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
434
435         if self.enable_write and e.writable():
436             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
437
438         entry.st_nlink = 1
439         entry.st_uid = self.uid
440         entry.st_gid = self.gid
441         entry.st_rdev = 0
442
443         entry.st_size = e.size()
444
445         entry.st_blksize = 512
446         entry.st_blocks = (entry.st_size/512)+1
447         if hasattr(entry, 'st_atime_ns'):
448             # llfuse >= 0.42
449             entry.st_atime_ns = int(e.atime() * 1000000000)
450             entry.st_mtime_ns = int(e.mtime() * 1000000000)
451             entry.st_ctime_ns = int(e.mtime() * 1000000000)
452         else:
453             # llfuse < 0.42
454             entry.st_atime = int(e.atime)
455             entry.st_mtime = int(e.mtime)
456             entry.st_ctime = int(e.mtime)
457
458         return entry
459
460     @catch_exceptions
461     def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
462         entry = self.getattr(inode)
463
464         if fh is not None and fh in self._filehandles:
465             handle = self._filehandles[fh]
466             e = handle.obj
467         else:
468             e = self.inodes[inode]
469
470         if fields is None:
471             # llfuse < 0.42
472             update_size = attr.st_size is not None
473         else:
474             # llfuse >= 0.42
475             update_size = fields.update_size
476         if update_size and isinstance(e, FuseArvadosFile):
477             with llfuse.lock_released:
478                 e.arvfile.truncate(attr.st_size)
479                 entry.st_size = e.arvfile.size()
480
481         return entry
482
483     @catch_exceptions
484     def lookup(self, parent_inode, name, ctx=None):
485         name = unicode(name, self.inodes.encoding)
486         inode = None
487
488         if name == '.':
489             inode = parent_inode
490         else:
491             if parent_inode in self.inodes:
492                 p = self.inodes[parent_inode]
493                 self.inodes.touch(p)
494                 if name == '..':
495                     inode = p.parent_inode
496                 elif isinstance(p, Directory) and name in p:
497                     inode = p[name].inode
498
499         if inode != None:
500             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
501                       parent_inode, name, inode)
502             self.inodes[inode].inc_ref()
503             return self.getattr(inode)
504         else:
505             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
506                       parent_inode, name)
507             raise llfuse.FUSEError(errno.ENOENT)
508
509     @catch_exceptions
510     def forget(self, inodes):
511         if self._shutdown_started.is_set():
512             return
513         for inode, nlookup in inodes:
514             ent = self.inodes[inode]
515             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
516             if ent.dec_ref(nlookup) == 0 and ent.dead:
517                 self.inodes.del_entry(ent)
518
519     @catch_exceptions
520     def open(self, inode, flags, ctx=None):
521         if inode in self.inodes:
522             p = self.inodes[inode]
523         else:
524             raise llfuse.FUSEError(errno.ENOENT)
525
526         if isinstance(p, Directory):
527             raise llfuse.FUSEError(errno.EISDIR)
528
529         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
530             raise llfuse.FUSEError(errno.EPERM)
531
532         fh = next(self._filehandles_counter)
533         self._filehandles[fh] = FileHandle(fh, p)
534         self.inodes.touch(p)
535
536         # Normally, we will have received an "update" event if the
537         # parent collection is stale here. However, even if the parent
538         # collection hasn't changed, the manifest might have been
539         # fetched so long ago that the signatures on the data block
540         # locators have expired. Calling checkupdate() on all
541         # ancestors ensures the signatures will be refreshed if
542         # necessary.
543         while p.parent_inode in self.inodes:
544             if p == self.inodes[p.parent_inode]:
545                 break
546             p = self.inodes[p.parent_inode]
547             self.inodes.touch(p)
548             p.checkupdate()
549
550         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
551
552         return fh
553
554     @catch_exceptions
555     def read(self, fh, off, size):
556         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
557         self.read_ops_counter.add(1)
558
559         if fh in self._filehandles:
560             handle = self._filehandles[fh]
561         else:
562             raise llfuse.FUSEError(errno.EBADF)
563
564         self.inodes.touch(handle.obj)
565
566         r = handle.obj.readfrom(off, size, self.num_retries)
567         if r:
568             self.read_counter.add(len(r))
569         return r
570
571     @catch_exceptions
572     def write(self, fh, off, buf):
573         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
574         self.write_ops_counter.add(1)
575
576         if fh in self._filehandles:
577             handle = self._filehandles[fh]
578         else:
579             raise llfuse.FUSEError(errno.EBADF)
580
581         if not handle.obj.writable():
582             raise llfuse.FUSEError(errno.EPERM)
583
584         self.inodes.touch(handle.obj)
585
586         w = handle.obj.writeto(off, buf, self.num_retries)
587         if w:
588             self.write_counter.add(w)
589         return w
590
591     @catch_exceptions
592     def release(self, fh):
593         if fh in self._filehandles:
594             _logger.debug("arv-mount release fh %i", fh)
595             try:
596                 self._filehandles[fh].flush()
597             except Exception:
598                 raise
599             finally:
600                 self._filehandles[fh].release()
601                 del self._filehandles[fh]
602         self.inodes.inode_cache.cap_cache()
603
604     def releasedir(self, fh):
605         self.release(fh)
606
607     @catch_exceptions
608     def opendir(self, inode, ctx=None):
609         _logger.debug("arv-mount opendir: inode %i", inode)
610
611         if inode in self.inodes:
612             p = self.inodes[inode]
613         else:
614             raise llfuse.FUSEError(errno.ENOENT)
615
616         if not isinstance(p, Directory):
617             raise llfuse.FUSEError(errno.ENOTDIR)
618
619         fh = next(self._filehandles_counter)
620         if p.parent_inode in self.inodes:
621             parent = self.inodes[p.parent_inode]
622         else:
623             raise llfuse.FUSEError(errno.EIO)
624
625         # update atime
626         self.inodes.touch(p)
627
628         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
629         return fh
630
631     @catch_exceptions
632     def readdir(self, fh, off):
633         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
634
635         if fh in self._filehandles:
636             handle = self._filehandles[fh]
637         else:
638             raise llfuse.FUSEError(errno.EBADF)
639
640         e = off
641         while e < len(handle.entries):
642             if handle.entries[e][1].inode in self.inodes:
643                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
644             e += 1
645
646     @catch_exceptions
647     def statfs(self, ctx=None):
648         st = llfuse.StatvfsData()
649         st.f_bsize = 128 * 1024
650         st.f_blocks = 0
651         st.f_files = 0
652
653         st.f_bfree = 0
654         st.f_bavail = 0
655
656         st.f_ffree = 0
657         st.f_favail = 0
658
659         st.f_frsize = 0
660         return st
661
662     def _check_writable(self, inode_parent):
663         if not self.enable_write:
664             raise llfuse.FUSEError(errno.EROFS)
665
666         if inode_parent in self.inodes:
667             p = self.inodes[inode_parent]
668         else:
669             raise llfuse.FUSEError(errno.ENOENT)
670
671         if not isinstance(p, Directory):
672             raise llfuse.FUSEError(errno.ENOTDIR)
673
674         if not p.writable():
675             raise llfuse.FUSEError(errno.EPERM)
676
677         return p
678
679     @catch_exceptions
680     def create(self, inode_parent, name, mode, flags, ctx=None):
681         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
682
683         p = self._check_writable(inode_parent)
684         p.create(name)
685
686         # The file entry should have been implicitly created by callback.
687         f = p[name]
688         fh = next(self._filehandles_counter)
689         self._filehandles[fh] = FileHandle(fh, f)
690         self.inodes.touch(p)
691
692         f.inc_ref()
693         return (fh, self.getattr(f.inode))
694
695     @catch_exceptions
696     def mkdir(self, inode_parent, name, mode, ctx=None):
697         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
698
699         p = self._check_writable(inode_parent)
700         p.mkdir(name)
701
702         # The dir entry should have been implicitly created by callback.
703         d = p[name]
704
705         d.inc_ref()
706         return self.getattr(d.inode)
707
708     @catch_exceptions
709     def unlink(self, inode_parent, name, ctx=None):
710         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
711         p = self._check_writable(inode_parent)
712         p.unlink(name)
713
714     @catch_exceptions
715     def rmdir(self, inode_parent, name, ctx=None):
716         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
717         p = self._check_writable(inode_parent)
718         p.rmdir(name)
719
720     @catch_exceptions
721     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
722         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
723         src = self._check_writable(inode_parent_old)
724         dest = self._check_writable(inode_parent_new)
725         dest.rename(name_old, name_new, src)
726
727     @catch_exceptions
728     def flush(self, fh):
729         if fh in self._filehandles:
730             self._filehandles[fh].flush()
731
732     def fsync(self, fh, datasync):
733         self.flush(fh)
734
735     def fsyncdir(self, fh, datasync):
736         self.flush(fh)