10117: Additional refinement: a directory not in use but still referenced needs
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69 import arvados.keep
70
71 import Queue
72
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
75 # details.
76
77 llfuse.capi._notify_queue = Queue.Queue()
78
79 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
80 from fusefile import StringFile, FuseArvadosFile
81
82 _logger = logging.getLogger('arvados.arvados_fuse')
83
84 # Uncomment this to enable llfuse debug logging.
85 # log_handler = logging.StreamHandler()
86 # llogger = logging.getLogger('llfuse')
87 # llogger.addHandler(log_handler)
88 # llogger.setLevel(logging.DEBUG)
89
90 class Handle(object):
91     """Connects a numeric file handle to a File or Directory object that has
92     been opened by the client."""
93
94     def __init__(self, fh, obj):
95         self.fh = fh
96         self.obj = obj
97         self.obj.inc_use()
98
99     def release(self):
100         self.obj.dec_use()
101
102     def flush(self):
103         if self.obj.writable():
104             return self.obj.flush()
105
106
107 class FileHandle(Handle):
108     """Connects a numeric file handle to a File  object that has
109     been opened by the client."""
110     pass
111
112
113 class DirectoryHandle(Handle):
114     """Connects a numeric file handle to a Directory object that has
115     been opened by the client."""
116
117     def __init__(self, fh, dirobj, entries):
118         super(DirectoryHandle, self).__init__(fh, dirobj)
119         self.entries = entries
120
121
122 class InodeCache(object):
123     """Records the memory footprint of objects and when they are last used.
124
125     When the cache limit is exceeded, the least recently used objects are
126     cleared.  Clearing the object means discarding its contents to release
127     memory.  The next time the object is accessed, it must be re-fetched from
128     the server.  Note that the inode cache limit is a soft limit; the cache
129     limit may be exceeded if necessary to load very large objects, it may also
130     be exceeded if open file handles prevent objects from being cleared.
131
132     """
133
134     def __init__(self, cap, min_entries=4):
135         self._entries = collections.OrderedDict()
136         self._by_uuid = {}
137         self._counter = itertools.count(0)
138         self.cap = cap
139         self._total = 0
140         self.min_entries = min_entries
141
142     def total(self):
143         return self._total
144
145     def _remove(self, obj, clear):
146         if clear:
147             if obj.in_use():
148                 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
149                 return False
150             if obj.has_ref(only_children=True):
151                 obj.invalidate()
152                 _logger.debug("InodeCache invalidate inode %i", obj.inode)
153                 return False
154             obj.clear()
155         self._total -= obj.cache_size
156         del self._entries[obj.cache_priority]
157         if obj.cache_uuid:
158             self._by_uuid[obj.cache_uuid].remove(obj)
159             if not self._by_uuid[obj.cache_uuid]:
160                 del self._by_uuid[obj.cache_uuid]
161             obj.cache_uuid = None
162         if clear:
163             _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
164         return True
165
166     def cap_cache(self):
167         if self._total > self.cap:
168             for key in list(self._entries.keys()):
169                 if self._total < self.cap or len(self._entries) < self.min_entries:
170                     break
171                 self._remove(self._entries[key], True)
172
173     def manage(self, obj):
174         if obj.persisted():
175             obj.cache_priority = next(self._counter)
176             obj.cache_size = obj.objsize()
177             self._entries[obj.cache_priority] = obj
178             obj.cache_uuid = obj.uuid()
179             if obj.cache_uuid:
180                 if obj.cache_uuid not in self._by_uuid:
181                     self._by_uuid[obj.cache_uuid] = [obj]
182                 else:
183                     if obj not in self._by_uuid[obj.cache_uuid]:
184                         self._by_uuid[obj.cache_uuid].append(obj)
185             self._total += obj.objsize()
186             _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
187             self.cap_cache()
188         else:
189             obj.cache_priority = None
190
191     def touch(self, obj):
192         if obj.persisted():
193             if obj.cache_priority in self._entries:
194                 self._remove(obj, False)
195             self.manage(obj)
196
197     def unmanage(self, obj):
198         if obj.persisted() and obj.cache_priority in self._entries:
199             self._remove(obj, True)
200
201     def find_by_uuid(self, uuid):
202         return self._by_uuid.get(uuid, [])
203
204     def clear(self):
205         self._entries.clear()
206         self._by_uuid.clear()
207         self._total = 0
208
209 class Inodes(object):
210     """Manage the set of inodes.  This is the mapping from a numeric id
211     to a concrete File or Directory object"""
212
213     def __init__(self, inode_cache, encoding="utf-8"):
214         self._entries = {}
215         self._counter = itertools.count(llfuse.ROOT_INODE)
216         self.inode_cache = inode_cache
217         self.encoding = encoding
218         self.deferred_invalidations = []
219
220     def __getitem__(self, item):
221         return self._entries[item]
222
223     def __setitem__(self, key, item):
224         self._entries[key] = item
225
226     def __iter__(self):
227         return self._entries.iterkeys()
228
229     def items(self):
230         return self._entries.items()
231
232     def __contains__(self, k):
233         return k in self._entries
234
235     def touch(self, entry):
236         entry._atime = time.time()
237         self.inode_cache.touch(entry)
238
239     def add_entry(self, entry):
240         entry.inode = next(self._counter)
241         if entry.inode == llfuse.ROOT_INODE:
242             entry.inc_ref()
243         self._entries[entry.inode] = entry
244         self.inode_cache.manage(entry)
245         return entry
246
247     def del_entry(self, entry):
248         if entry.ref_count == 0:
249             self.inode_cache.unmanage(entry)
250             del self._entries[entry.inode]
251             with llfuse.lock_released:
252                 entry.finalize()
253             self.invalidate_inode(entry.inode)
254             entry.inode = None
255         else:
256             entry.dead = True
257             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
258
259     def invalidate_inode(self, inode):
260         llfuse.invalidate_inode(inode)
261
262     def invalidate_entry(self, inode, name):
263         llfuse.invalidate_entry(inode, name.encode(self.encoding))
264
265     def clear(self):
266         self.inode_cache.clear()
267
268         for k,v in self._entries.items():
269             try:
270                 v.finalize()
271             except Exception as e:
272                 _logger.exception("Error during finalize of inode %i", k)
273
274         self._entries.clear()
275
276
277 def catch_exceptions(orig_func):
278     """Catch uncaught exceptions and log them consistently."""
279
280     @functools.wraps(orig_func)
281     def catch_exceptions_wrapper(self, *args, **kwargs):
282         try:
283             return orig_func(self, *args, **kwargs)
284         except llfuse.FUSEError:
285             raise
286         except EnvironmentError as e:
287             raise llfuse.FUSEError(e.errno)
288         except arvados.errors.KeepWriteError as e:
289             _logger.error("Keep write error: " + str(e))
290             raise llfuse.FUSEError(errno.EIO)
291         except arvados.errors.NotFoundError as e:
292             _logger.error("Block not found error: " + str(e))
293             raise llfuse.FUSEError(errno.EIO)
294         except:
295             _logger.exception("Unhandled exception during FUSE operation")
296             raise llfuse.FUSEError(errno.EIO)
297
298     return catch_exceptions_wrapper
299
300
301 class Operations(llfuse.Operations):
302     """This is the main interface with llfuse.
303
304     The methods on this object are called by llfuse threads to service FUSE
305     events to query and read from the file system.
306
307     llfuse has its own global lock which is acquired before calling a request handler,
308     so request handlers do not run concurrently unless the lock is explicitly released
309     using 'with llfuse.lock_released:'
310
311     """
312
313     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
314         super(Operations, self).__init__()
315
316         self._api_client = api_client
317
318         if not inode_cache:
319             inode_cache = InodeCache(cap=256*1024*1024)
320         self.inodes = Inodes(inode_cache, encoding=encoding)
321         self.uid = uid
322         self.gid = gid
323         self.enable_write = enable_write
324
325         # dict of inode to filehandle
326         self._filehandles = {}
327         self._filehandles_counter = itertools.count(0)
328
329         # Other threads that need to wait until the fuse driver
330         # is fully initialized should wait() on this event object.
331         self.initlock = threading.Event()
332
333         # If we get overlapping shutdown events (e.g., fusermount -u
334         # -z and operations.destroy()) llfuse calls forget() on inodes
335         # that have already been deleted. To avoid this, we make
336         # forget() a no-op if called after destroy().
337         self._shutdown_started = threading.Event()
338
339         self.num_retries = num_retries
340
341         self.read_counter = arvados.keep.Counter()
342         self.write_counter = arvados.keep.Counter()
343         self.read_ops_counter = arvados.keep.Counter()
344         self.write_ops_counter = arvados.keep.Counter()
345
346         self.events = None
347
348     def init(self):
349         # Allow threads that are waiting for the driver to be finished
350         # initializing to continue
351         self.initlock.set()
352
353     @catch_exceptions
354     def destroy(self):
355         with llfuse.lock:
356             self._shutdown_started.set()
357             if self.events:
358                 self.events.close()
359                 self.events = None
360
361             self.inodes.clear()
362
363     def access(self, inode, mode, ctx):
364         return True
365
366     def listen_for_events(self):
367         self.events = arvados.events.subscribe(self._api_client,
368                                  [["event_type", "in", ["create", "update", "delete"]]],
369                                  self.on_event)
370
371     @catch_exceptions
372     def on_event(self, ev):
373         if 'event_type' not in ev:
374             return
375         with llfuse.lock:
376             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
377                 item.invalidate()
378                 if ev["object_kind"] == "arvados#collection":
379                     new_attr = (ev.get("properties") and
380                                 ev["properties"].get("new_attributes") and
381                                 ev["properties"]["new_attributes"])
382
383                     # new_attributes.modified_at currently lacks
384                     # subsecond precision (see #6347) so use event_at
385                     # which should always be the same.
386                     record_version = (
387                         (ev["event_at"], new_attr["portable_data_hash"])
388                         if new_attr else None)
389
390                     item.update(to_record_version=record_version)
391                 else:
392                     item.update()
393
394             oldowner = (
395                 ev.get("properties") and
396                 ev["properties"].get("old_attributes") and
397                 ev["properties"]["old_attributes"].get("owner_uuid"))
398             newowner = ev["object_owner_uuid"]
399             for parent in (
400                     self.inodes.inode_cache.find_by_uuid(oldowner) +
401                     self.inodes.inode_cache.find_by_uuid(newowner)):
402                 parent.invalidate()
403                 parent.update()
404
405
406     @catch_exceptions
407     def getattr(self, inode):
408         if inode not in self.inodes:
409             raise llfuse.FUSEError(errno.ENOENT)
410
411         e = self.inodes[inode]
412
413         entry = llfuse.EntryAttributes()
414         entry.st_ino = inode
415         entry.generation = 0
416         entry.entry_timeout = 60 if e.allow_dirent_cache else 0
417         entry.attr_timeout = 60 if e.allow_attr_cache else 0
418
419         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
420         if isinstance(e, Directory):
421             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
422         else:
423             entry.st_mode |= stat.S_IFREG
424             if isinstance(e, FuseArvadosFile):
425                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
426
427         if self.enable_write and e.writable():
428             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
429
430         entry.st_nlink = 1
431         entry.st_uid = self.uid
432         entry.st_gid = self.gid
433         entry.st_rdev = 0
434
435         entry.st_size = e.size()
436
437         entry.st_blksize = 512
438         entry.st_blocks = (entry.st_size/512)+1
439         entry.st_atime = int(e.atime())
440         entry.st_mtime = int(e.mtime())
441         entry.st_ctime = int(e.mtime())
442
443         return entry
444
445     @catch_exceptions
446     def setattr(self, inode, attr):
447         entry = self.getattr(inode)
448
449         e = self.inodes[inode]
450
451         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
452             with llfuse.lock_released:
453                 e.arvfile.truncate(attr.st_size)
454                 entry.st_size = e.arvfile.size()
455
456         return entry
457
458     @catch_exceptions
459     def lookup(self, parent_inode, name):
460         name = unicode(name, self.inodes.encoding)
461         inode = None
462
463         if name == '.':
464             inode = parent_inode
465         else:
466             if parent_inode in self.inodes:
467                 p = self.inodes[parent_inode]
468                 self.inodes.touch(p)
469                 if name == '..':
470                     inode = p.parent_inode
471                 elif isinstance(p, Directory) and name in p:
472                     inode = p[name].inode
473
474         if inode != None:
475             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
476                       parent_inode, name, inode)
477             self.inodes[inode].inc_ref()
478             return self.getattr(inode)
479         else:
480             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
481                       parent_inode, name)
482             raise llfuse.FUSEError(errno.ENOENT)
483
484     @catch_exceptions
485     def forget(self, inodes):
486         if self._shutdown_started.is_set():
487             return
488         for inode, nlookup in inodes:
489             ent = self.inodes[inode]
490             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
491             if ent.dec_ref(nlookup) == 0 and ent.dead:
492                 self.inodes.del_entry(ent)
493
494     @catch_exceptions
495     def open(self, inode, flags):
496         if inode in self.inodes:
497             p = self.inodes[inode]
498         else:
499             raise llfuse.FUSEError(errno.ENOENT)
500
501         if isinstance(p, Directory):
502             raise llfuse.FUSEError(errno.EISDIR)
503
504         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
505             raise llfuse.FUSEError(errno.EPERM)
506
507         fh = next(self._filehandles_counter)
508         self._filehandles[fh] = FileHandle(fh, p)
509         self.inodes.touch(p)
510
511         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
512
513         return fh
514
515     @catch_exceptions
516     def read(self, fh, off, size):
517         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
518         self.read_ops_counter.add(1)
519
520         if fh in self._filehandles:
521             handle = self._filehandles[fh]
522         else:
523             raise llfuse.FUSEError(errno.EBADF)
524
525         self.inodes.touch(handle.obj)
526
527         r = handle.obj.readfrom(off, size, self.num_retries)
528         if r:
529             self.read_counter.add(len(r))
530         return r
531
532     @catch_exceptions
533     def write(self, fh, off, buf):
534         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
535         self.write_ops_counter.add(1)
536
537         if fh in self._filehandles:
538             handle = self._filehandles[fh]
539         else:
540             raise llfuse.FUSEError(errno.EBADF)
541
542         if not handle.obj.writable():
543             raise llfuse.FUSEError(errno.EPERM)
544
545         self.inodes.touch(handle.obj)
546
547         w = handle.obj.writeto(off, buf, self.num_retries)
548         if w:
549             self.write_counter.add(w)
550         return w
551
552     @catch_exceptions
553     def release(self, fh):
554         if fh in self._filehandles:
555             try:
556                 self._filehandles[fh].flush()
557             except Exception:
558                 raise
559             finally:
560                 self._filehandles[fh].release()
561                 del self._filehandles[fh]
562         self.inodes.inode_cache.cap_cache()
563
564     def releasedir(self, fh):
565         self.release(fh)
566
567     @catch_exceptions
568     def opendir(self, inode):
569         _logger.debug("arv-mount opendir: inode %i", inode)
570
571         if inode in self.inodes:
572             p = self.inodes[inode]
573         else:
574             raise llfuse.FUSEError(errno.ENOENT)
575
576         if not isinstance(p, Directory):
577             raise llfuse.FUSEError(errno.ENOTDIR)
578
579         fh = next(self._filehandles_counter)
580         if p.parent_inode in self.inodes:
581             parent = self.inodes[p.parent_inode]
582         else:
583             raise llfuse.FUSEError(errno.EIO)
584
585         # update atime
586         self.inodes.touch(p)
587
588         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
589         return fh
590
591     @catch_exceptions
592     def readdir(self, fh, off):
593         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
594
595         if fh in self._filehandles:
596             handle = self._filehandles[fh]
597         else:
598             raise llfuse.FUSEError(errno.EBADF)
599
600         e = off
601         while e < len(handle.entries):
602             if handle.entries[e][1].inode in self.inodes:
603                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
604             e += 1
605
606     @catch_exceptions
607     def statfs(self):
608         st = llfuse.StatvfsData()
609         st.f_bsize = 128 * 1024
610         st.f_blocks = 0
611         st.f_files = 0
612
613         st.f_bfree = 0
614         st.f_bavail = 0
615
616         st.f_ffree = 0
617         st.f_favail = 0
618
619         st.f_frsize = 0
620         return st
621
622     def _check_writable(self, inode_parent):
623         if not self.enable_write:
624             raise llfuse.FUSEError(errno.EROFS)
625
626         if inode_parent in self.inodes:
627             p = self.inodes[inode_parent]
628         else:
629             raise llfuse.FUSEError(errno.ENOENT)
630
631         if not isinstance(p, Directory):
632             raise llfuse.FUSEError(errno.ENOTDIR)
633
634         if not p.writable():
635             raise llfuse.FUSEError(errno.EPERM)
636
637         return p
638
639     @catch_exceptions
640     def create(self, inode_parent, name, mode, flags, ctx):
641         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
642
643         p = self._check_writable(inode_parent)
644         p.create(name)
645
646         # The file entry should have been implicitly created by callback.
647         f = p[name]
648         fh = next(self._filehandles_counter)
649         self._filehandles[fh] = FileHandle(fh, f)
650         self.inodes.touch(p)
651
652         f.inc_ref()
653         return (fh, self.getattr(f.inode))
654
655     @catch_exceptions
656     def mkdir(self, inode_parent, name, mode, ctx):
657         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
658
659         p = self._check_writable(inode_parent)
660         p.mkdir(name)
661
662         # The dir entry should have been implicitly created by callback.
663         d = p[name]
664
665         d.inc_ref()
666         return self.getattr(d.inode)
667
668     @catch_exceptions
669     def unlink(self, inode_parent, name):
670         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
671         p = self._check_writable(inode_parent)
672         p.unlink(name)
673
674     @catch_exceptions
675     def rmdir(self, inode_parent, name):
676         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
677         p = self._check_writable(inode_parent)
678         p.rmdir(name)
679
680     @catch_exceptions
681     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
682         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
683         src = self._check_writable(inode_parent_old)
684         dest = self._check_writable(inode_parent_new)
685         dest.rename(name_old, name_new, src)
686
687     @catch_exceptions
688     def flush(self, fh):
689         if fh in self._filehandles:
690             self._filehandles[fh].flush()
691
692     def fsync(self, fh, datasync):
693         self.flush(fh)
694
695     def fsyncdir(self, fh, datasync):
696         self.flush(fh)