10008: Merge branch 'master' into 10008-flaky-token-test
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69 import arvados.keep
70
71 import Queue
72
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
75 # details.
76
77 llfuse.capi._notify_queue = Queue.Queue()
78
79 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
80 from fusefile import StringFile, FuseArvadosFile
81
82 _logger = logging.getLogger('arvados.arvados_fuse')
83
84 # Uncomment this to enable llfuse debug logging.
85 # log_handler = logging.StreamHandler()
86 # llogger = logging.getLogger('llfuse')
87 # llogger.addHandler(log_handler)
88 # llogger.setLevel(logging.DEBUG)
89
90 class Handle(object):
91     """Connects a numeric file handle to a File or Directory object that has
92     been opened by the client."""
93
94     def __init__(self, fh, obj):
95         self.fh = fh
96         self.obj = obj
97         self.obj.inc_use()
98
99     def release(self):
100         self.obj.dec_use()
101
102     def flush(self):
103         if self.obj.writable():
104             return self.obj.flush()
105
106
107 class FileHandle(Handle):
108     """Connects a numeric file handle to a File  object that has
109     been opened by the client."""
110     pass
111
112
113 class DirectoryHandle(Handle):
114     """Connects a numeric file handle to a Directory object that has
115     been opened by the client."""
116
117     def __init__(self, fh, dirobj, entries):
118         super(DirectoryHandle, self).__init__(fh, dirobj)
119         self.entries = entries
120
121
122 class InodeCache(object):
123     """Records the memory footprint of objects and when they are last used.
124
125     When the cache limit is exceeded, the least recently used objects are
126     cleared.  Clearing the object means discarding its contents to release
127     memory.  The next time the object is accessed, it must be re-fetched from
128     the server.  Note that the inode cache limit is a soft limit; the cache
129     limit may be exceeded if necessary to load very large objects, it may also
130     be exceeded if open file handles prevent objects from being cleared.
131
132     """
133
134     def __init__(self, cap, min_entries=4):
135         self._entries = collections.OrderedDict()
136         self._by_uuid = {}
137         self.cap = cap
138         self._total = 0
139         self.min_entries = min_entries
140
141     def total(self):
142         return self._total
143
144     def _remove(self, obj, clear):
145         if clear:
146             if obj.in_use():
147                 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
148                 return
149             if obj.has_ref(True):
150                 obj.kernel_invalidate()
151                 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
152                 return
153             obj.clear()
154
155         # The llfuse lock is released in del_entry(), which is called by
156         # Directory.clear().  While the llfuse lock is released, it can happen
157         # that a reentrant call removes this entry before this call gets to it.
158         # Ensure that the entry is still valid before trying to remove it.
159         if obj.inode not in self._entries:
160             return
161
162         self._total -= obj.cache_size
163         del self._entries[obj.inode]
164         if obj.cache_uuid:
165             self._by_uuid[obj.cache_uuid].remove(obj)
166             if not self._by_uuid[obj.cache_uuid]:
167                 del self._by_uuid[obj.cache_uuid]
168             obj.cache_uuid = None
169         if clear:
170             _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
171
172     def cap_cache(self):
173         if self._total > self.cap:
174             for ent in self._entries.values():
175                 if self._total < self.cap or len(self._entries) < self.min_entries:
176                     break
177                 self._remove(ent, True)
178
179     def manage(self, obj):
180         if obj.persisted():
181             obj.cache_size = obj.objsize()
182             self._entries[obj.inode] = obj
183             obj.cache_uuid = obj.uuid()
184             if obj.cache_uuid:
185                 if obj.cache_uuid not in self._by_uuid:
186                     self._by_uuid[obj.cache_uuid] = [obj]
187                 else:
188                     if obj not in self._by_uuid[obj.cache_uuid]:
189                         self._by_uuid[obj.cache_uuid].append(obj)
190             self._total += obj.objsize()
191             _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
192             self.cap_cache()
193
194     def touch(self, obj):
195         if obj.persisted():
196             if obj.inode in self._entries:
197                 self._remove(obj, False)
198             self.manage(obj)
199
200     def unmanage(self, obj):
201         if obj.persisted() and obj.inode in self._entries:
202             self._remove(obj, True)
203
204     def find_by_uuid(self, uuid):
205         return self._by_uuid.get(uuid, [])
206
207     def clear(self):
208         self._entries.clear()
209         self._by_uuid.clear()
210         self._total = 0
211
212 class Inodes(object):
213     """Manage the set of inodes.  This is the mapping from a numeric id
214     to a concrete File or Directory object"""
215
216     def __init__(self, inode_cache, encoding="utf-8"):
217         self._entries = {}
218         self._counter = itertools.count(llfuse.ROOT_INODE)
219         self.inode_cache = inode_cache
220         self.encoding = encoding
221         self.deferred_invalidations = []
222
223     def __getitem__(self, item):
224         return self._entries[item]
225
226     def __setitem__(self, key, item):
227         self._entries[key] = item
228
229     def __iter__(self):
230         return self._entries.iterkeys()
231
232     def items(self):
233         return self._entries.items()
234
235     def __contains__(self, k):
236         return k in self._entries
237
238     def touch(self, entry):
239         entry._atime = time.time()
240         self.inode_cache.touch(entry)
241
242     def add_entry(self, entry):
243         entry.inode = next(self._counter)
244         if entry.inode == llfuse.ROOT_INODE:
245             entry.inc_ref()
246         self._entries[entry.inode] = entry
247         self.inode_cache.manage(entry)
248         return entry
249
250     def del_entry(self, entry):
251         if entry.ref_count == 0:
252             self.inode_cache.unmanage(entry)
253             del self._entries[entry.inode]
254             with llfuse.lock_released:
255                 entry.finalize()
256             self.invalidate_inode(entry.inode)
257             entry.inode = None
258         else:
259             entry.dead = True
260             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
261
262     def invalidate_inode(self, inode):
263         llfuse.invalidate_inode(inode)
264
265     def invalidate_entry(self, inode, name):
266         llfuse.invalidate_entry(inode, name.encode(self.encoding))
267
268     def clear(self):
269         self.inode_cache.clear()
270
271         for k,v in self._entries.items():
272             try:
273                 v.finalize()
274             except Exception as e:
275                 _logger.exception("Error during finalize of inode %i", k)
276
277         self._entries.clear()
278
279
280 def catch_exceptions(orig_func):
281     """Catch uncaught exceptions and log them consistently."""
282
283     @functools.wraps(orig_func)
284     def catch_exceptions_wrapper(self, *args, **kwargs):
285         try:
286             return orig_func(self, *args, **kwargs)
287         except llfuse.FUSEError:
288             raise
289         except EnvironmentError as e:
290             raise llfuse.FUSEError(e.errno)
291         except arvados.errors.KeepWriteError as e:
292             _logger.error("Keep write error: " + str(e))
293             raise llfuse.FUSEError(errno.EIO)
294         except arvados.errors.NotFoundError as e:
295             _logger.error("Block not found error: " + str(e))
296             raise llfuse.FUSEError(errno.EIO)
297         except:
298             _logger.exception("Unhandled exception during FUSE operation")
299             raise llfuse.FUSEError(errno.EIO)
300
301     return catch_exceptions_wrapper
302
303
304 class Operations(llfuse.Operations):
305     """This is the main interface with llfuse.
306
307     The methods on this object are called by llfuse threads to service FUSE
308     events to query and read from the file system.
309
310     llfuse has its own global lock which is acquired before calling a request handler,
311     so request handlers do not run concurrently unless the lock is explicitly released
312     using 'with llfuse.lock_released:'
313
314     """
315
316     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
317         super(Operations, self).__init__()
318
319         self._api_client = api_client
320
321         if not inode_cache:
322             inode_cache = InodeCache(cap=256*1024*1024)
323         self.inodes = Inodes(inode_cache, encoding=encoding)
324         self.uid = uid
325         self.gid = gid
326         self.enable_write = enable_write
327
328         # dict of inode to filehandle
329         self._filehandles = {}
330         self._filehandles_counter = itertools.count(0)
331
332         # Other threads that need to wait until the fuse driver
333         # is fully initialized should wait() on this event object.
334         self.initlock = threading.Event()
335
336         # If we get overlapping shutdown events (e.g., fusermount -u
337         # -z and operations.destroy()) llfuse calls forget() on inodes
338         # that have already been deleted. To avoid this, we make
339         # forget() a no-op if called after destroy().
340         self._shutdown_started = threading.Event()
341
342         self.num_retries = num_retries
343
344         self.read_counter = arvados.keep.Counter()
345         self.write_counter = arvados.keep.Counter()
346         self.read_ops_counter = arvados.keep.Counter()
347         self.write_ops_counter = arvados.keep.Counter()
348
349         self.events = None
350
351     def init(self):
352         # Allow threads that are waiting for the driver to be finished
353         # initializing to continue
354         self.initlock.set()
355
356     @catch_exceptions
357     def destroy(self):
358         with llfuse.lock:
359             self._shutdown_started.set()
360             if self.events:
361                 self.events.close()
362                 self.events = None
363
364             self.inodes.clear()
365
366     def access(self, inode, mode, ctx):
367         return True
368
369     def listen_for_events(self):
370         self.events = arvados.events.subscribe(self._api_client,
371                                  [["event_type", "in", ["create", "update", "delete"]]],
372                                  self.on_event)
373
374     @catch_exceptions
375     def on_event(self, ev):
376         if 'event_type' not in ev:
377             return
378         with llfuse.lock:
379             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
380                 item.invalidate()
381                 if ev["object_kind"] == "arvados#collection":
382                     new_attr = (ev.get("properties") and
383                                 ev["properties"].get("new_attributes") and
384                                 ev["properties"]["new_attributes"])
385
386                     # new_attributes.modified_at currently lacks
387                     # subsecond precision (see #6347) so use event_at
388                     # which should always be the same.
389                     record_version = (
390                         (ev["event_at"], new_attr["portable_data_hash"])
391                         if new_attr else None)
392
393                     item.update(to_record_version=record_version)
394                 else:
395                     item.update()
396
397             oldowner = (
398                 ev.get("properties") and
399                 ev["properties"].get("old_attributes") and
400                 ev["properties"]["old_attributes"].get("owner_uuid"))
401             newowner = ev["object_owner_uuid"]
402             for parent in (
403                     self.inodes.inode_cache.find_by_uuid(oldowner) +
404                     self.inodes.inode_cache.find_by_uuid(newowner)):
405                 parent.invalidate()
406                 parent.update()
407
408
409     @catch_exceptions
410     def getattr(self, inode):
411         if inode not in self.inodes:
412             raise llfuse.FUSEError(errno.ENOENT)
413
414         e = self.inodes[inode]
415
416         entry = llfuse.EntryAttributes()
417         entry.st_ino = inode
418         entry.generation = 0
419         entry.entry_timeout = 60 if e.allow_dirent_cache else 0
420         entry.attr_timeout = 60 if e.allow_attr_cache else 0
421
422         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
423         if isinstance(e, Directory):
424             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
425         else:
426             entry.st_mode |= stat.S_IFREG
427             if isinstance(e, FuseArvadosFile):
428                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
429
430         if self.enable_write and e.writable():
431             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
432
433         entry.st_nlink = 1
434         entry.st_uid = self.uid
435         entry.st_gid = self.gid
436         entry.st_rdev = 0
437
438         entry.st_size = e.size()
439
440         entry.st_blksize = 512
441         entry.st_blocks = (entry.st_size/512)+1
442         entry.st_atime = int(e.atime())
443         entry.st_mtime = int(e.mtime())
444         entry.st_ctime = int(e.mtime())
445
446         return entry
447
448     @catch_exceptions
449     def setattr(self, inode, attr):
450         entry = self.getattr(inode)
451
452         e = self.inodes[inode]
453
454         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
455             with llfuse.lock_released:
456                 e.arvfile.truncate(attr.st_size)
457                 entry.st_size = e.arvfile.size()
458
459         return entry
460
461     @catch_exceptions
462     def lookup(self, parent_inode, name):
463         name = unicode(name, self.inodes.encoding)
464         inode = None
465
466         if name == '.':
467             inode = parent_inode
468         else:
469             if parent_inode in self.inodes:
470                 p = self.inodes[parent_inode]
471                 self.inodes.touch(p)
472                 if name == '..':
473                     inode = p.parent_inode
474                 elif isinstance(p, Directory) and name in p:
475                     inode = p[name].inode
476
477         if inode != None:
478             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
479                       parent_inode, name, inode)
480             self.inodes[inode].inc_ref()
481             return self.getattr(inode)
482         else:
483             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
484                       parent_inode, name)
485             raise llfuse.FUSEError(errno.ENOENT)
486
487     @catch_exceptions
488     def forget(self, inodes):
489         if self._shutdown_started.is_set():
490             return
491         for inode, nlookup in inodes:
492             ent = self.inodes[inode]
493             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
494             if ent.dec_ref(nlookup) == 0 and ent.dead:
495                 self.inodes.del_entry(ent)
496
497     @catch_exceptions
498     def open(self, inode, flags):
499         if inode in self.inodes:
500             p = self.inodes[inode]
501         else:
502             raise llfuse.FUSEError(errno.ENOENT)
503
504         if isinstance(p, Directory):
505             raise llfuse.FUSEError(errno.EISDIR)
506
507         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
508             raise llfuse.FUSEError(errno.EPERM)
509
510         fh = next(self._filehandles_counter)
511         self._filehandles[fh] = FileHandle(fh, p)
512         self.inodes.touch(p)
513         while p.parent_inode in self.inodes:
514             if p == self.inodes[p.parent_inode]:
515                 break
516             p = self.inodes[p.parent_inode]
517             self.inodes.touch(p)
518             p.checkupdate()
519
520         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
521
522         return fh
523
524     @catch_exceptions
525     def read(self, fh, off, size):
526         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
527         self.read_ops_counter.add(1)
528
529         if fh in self._filehandles:
530             handle = self._filehandles[fh]
531         else:
532             raise llfuse.FUSEError(errno.EBADF)
533
534         self.inodes.touch(handle.obj)
535
536         r = handle.obj.readfrom(off, size, self.num_retries)
537         if r:
538             self.read_counter.add(len(r))
539         return r
540
541     @catch_exceptions
542     def write(self, fh, off, buf):
543         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
544         self.write_ops_counter.add(1)
545
546         if fh in self._filehandles:
547             handle = self._filehandles[fh]
548         else:
549             raise llfuse.FUSEError(errno.EBADF)
550
551         if not handle.obj.writable():
552             raise llfuse.FUSEError(errno.EPERM)
553
554         self.inodes.touch(handle.obj)
555
556         w = handle.obj.writeto(off, buf, self.num_retries)
557         if w:
558             self.write_counter.add(w)
559         return w
560
561     @catch_exceptions
562     def release(self, fh):
563         if fh in self._filehandles:
564             try:
565                 self._filehandles[fh].flush()
566             except Exception:
567                 raise
568             finally:
569                 self._filehandles[fh].release()
570                 del self._filehandles[fh]
571         self.inodes.inode_cache.cap_cache()
572
573     def releasedir(self, fh):
574         self.release(fh)
575
576     @catch_exceptions
577     def opendir(self, inode):
578         _logger.debug("arv-mount opendir: inode %i", inode)
579
580         if inode in self.inodes:
581             p = self.inodes[inode]
582         else:
583             raise llfuse.FUSEError(errno.ENOENT)
584
585         if not isinstance(p, Directory):
586             raise llfuse.FUSEError(errno.ENOTDIR)
587
588         fh = next(self._filehandles_counter)
589         if p.parent_inode in self.inodes:
590             parent = self.inodes[p.parent_inode]
591         else:
592             raise llfuse.FUSEError(errno.EIO)
593
594         # update atime
595         self.inodes.touch(p)
596
597         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
598         return fh
599
600     @catch_exceptions
601     def readdir(self, fh, off):
602         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
603
604         if fh in self._filehandles:
605             handle = self._filehandles[fh]
606         else:
607             raise llfuse.FUSEError(errno.EBADF)
608
609         e = off
610         while e < len(handle.entries):
611             if handle.entries[e][1].inode in self.inodes:
612                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
613             e += 1
614
615     @catch_exceptions
616     def statfs(self):
617         st = llfuse.StatvfsData()
618         st.f_bsize = 128 * 1024
619         st.f_blocks = 0
620         st.f_files = 0
621
622         st.f_bfree = 0
623         st.f_bavail = 0
624
625         st.f_ffree = 0
626         st.f_favail = 0
627
628         st.f_frsize = 0
629         return st
630
631     def _check_writable(self, inode_parent):
632         if not self.enable_write:
633             raise llfuse.FUSEError(errno.EROFS)
634
635         if inode_parent in self.inodes:
636             p = self.inodes[inode_parent]
637         else:
638             raise llfuse.FUSEError(errno.ENOENT)
639
640         if not isinstance(p, Directory):
641             raise llfuse.FUSEError(errno.ENOTDIR)
642
643         if not p.writable():
644             raise llfuse.FUSEError(errno.EPERM)
645
646         return p
647
648     @catch_exceptions
649     def create(self, inode_parent, name, mode, flags, ctx):
650         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
651
652         p = self._check_writable(inode_parent)
653         p.create(name)
654
655         # The file entry should have been implicitly created by callback.
656         f = p[name]
657         fh = next(self._filehandles_counter)
658         self._filehandles[fh] = FileHandle(fh, f)
659         self.inodes.touch(p)
660
661         f.inc_ref()
662         return (fh, self.getattr(f.inode))
663
664     @catch_exceptions
665     def mkdir(self, inode_parent, name, mode, ctx):
666         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
667
668         p = self._check_writable(inode_parent)
669         p.mkdir(name)
670
671         # The dir entry should have been implicitly created by callback.
672         d = p[name]
673
674         d.inc_ref()
675         return self.getattr(d.inode)
676
677     @catch_exceptions
678     def unlink(self, inode_parent, name):
679         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
680         p = self._check_writable(inode_parent)
681         p.unlink(name)
682
683     @catch_exceptions
684     def rmdir(self, inode_parent, name):
685         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
686         p = self._check_writable(inode_parent)
687         p.rmdir(name)
688
689     @catch_exceptions
690     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
691         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
692         src = self._check_writable(inode_parent_old)
693         dest = self._check_writable(inode_parent_new)
694         dest.rename(name_old, name_new, src)
695
696     @catch_exceptions
697     def flush(self, fh):
698         if fh in self._filehandles:
699             self._filehandles[fh].flush()
700
701     def fsync(self, fh, datasync):
702         self.flush(fh)
703
704     def fsyncdir(self, fh, datasync):
705         self.flush(fh)