Merge branch '5990-arv-run-default-runtime' closes #5990
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69
70 import Queue
71
72 # Default _notify_queue has a limit of 1000 items, but it really needs to be
73 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
74 # details.
75
76 llfuse.capi._notify_queue = Queue.Queue()
77
78 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
79 from fusefile import StringFile, FuseArvadosFile
80
81 _logger = logging.getLogger('arvados.arvados_fuse')
82
83 # Uncomment this to enable llfuse debug logging.
84 # log_handler = logging.StreamHandler()
85 # llogger = logging.getLogger('llfuse')
86 # llogger.addHandler(log_handler)
87 # llogger.setLevel(logging.DEBUG)
88
89 class Handle(object):
90     """Connects a numeric file handle to a File or Directory object that has
91     been opened by the client."""
92
93     def __init__(self, fh, obj):
94         self.fh = fh
95         self.obj = obj
96         self.obj.inc_use()
97
98     def release(self):
99         self.obj.dec_use()
100
101     def flush(self):
102         if self.obj.writable():
103             return self.obj.flush()
104
105
106 class FileHandle(Handle):
107     """Connects a numeric file handle to a File  object that has
108     been opened by the client."""
109     pass
110
111
112 class DirectoryHandle(Handle):
113     """Connects a numeric file handle to a Directory object that has
114     been opened by the client."""
115
116     def __init__(self, fh, dirobj, entries):
117         super(DirectoryHandle, self).__init__(fh, dirobj)
118         self.entries = entries
119
120
121 class InodeCache(object):
122     """Records the memory footprint of objects and when they are last used.
123
124     When the cache limit is exceeded, the least recently used objects are
125     cleared.  Clearing the object means discarding its contents to release
126     memory.  The next time the object is accessed, it must be re-fetched from
127     the server.  Note that the inode cache limit is a soft limit; the cache
128     limit may be exceeded if necessary to load very large objects, it may also
129     be exceeded if open file handles prevent objects from being cleared.
130
131     """
132
133     def __init__(self, cap, min_entries=4):
134         self._entries = collections.OrderedDict()
135         self._by_uuid = {}
136         self._counter = itertools.count(0)
137         self.cap = cap
138         self._total = 0
139         self.min_entries = min_entries
140
141     def total(self):
142         return self._total
143
144     def _remove(self, obj, clear):
145         if clear and not obj.clear():
146             _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
147             return False
148         self._total -= obj.cache_size
149         del self._entries[obj.cache_priority]
150         if obj.cache_uuid:
151             del self._by_uuid[obj.cache_uuid]
152             obj.cache_uuid = None
153         if clear:
154             _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
155         return True
156
157     def cap_cache(self):
158         if self._total > self.cap:
159             for key in list(self._entries.keys()):
160                 if self._total < self.cap or len(self._entries) < self.min_entries:
161                     break
162                 self._remove(self._entries[key], True)
163
164     def manage(self, obj):
165         if obj.persisted():
166             obj.cache_priority = next(self._counter)
167             obj.cache_size = obj.objsize()
168             self._entries[obj.cache_priority] = obj
169             obj.cache_uuid = obj.uuid()
170             if obj.cache_uuid:
171                 self._by_uuid[obj.cache_uuid] = obj
172             self._total += obj.objsize()
173             _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
174             self.cap_cache()
175         else:
176             obj.cache_priority = None
177
178     def touch(self, obj):
179         if obj.persisted():
180             if obj.cache_priority in self._entries:
181                 self._remove(obj, False)
182             self.manage(obj)
183
184     def unmanage(self, obj):
185         if obj.persisted() and obj.cache_priority in self._entries:
186             self._remove(obj, True)
187
188     def find(self, uuid):
189         return self._by_uuid.get(uuid)
190
191     def clear(self):
192         self._entries.clear()
193         self._by_uuid.clear()
194         self._total = 0
195
196 class Inodes(object):
197     """Manage the set of inodes.  This is the mapping from a numeric id
198     to a concrete File or Directory object"""
199
200     def __init__(self, inode_cache, encoding="utf-8"):
201         self._entries = {}
202         self._counter = itertools.count(llfuse.ROOT_INODE)
203         self.inode_cache = inode_cache
204         self.encoding = encoding
205         self.deferred_invalidations = []
206
207     def __getitem__(self, item):
208         return self._entries[item]
209
210     def __setitem__(self, key, item):
211         self._entries[key] = item
212
213     def __iter__(self):
214         return self._entries.iterkeys()
215
216     def items(self):
217         return self._entries.items()
218
219     def __contains__(self, k):
220         return k in self._entries
221
222     def touch(self, entry):
223         entry._atime = time.time()
224         self.inode_cache.touch(entry)
225
226     def add_entry(self, entry):
227         entry.inode = next(self._counter)
228         if entry.inode == llfuse.ROOT_INODE:
229             entry.inc_ref()
230         self._entries[entry.inode] = entry
231         self.inode_cache.manage(entry)
232         return entry
233
234     def del_entry(self, entry):
235         if entry.ref_count == 0:
236             self.inode_cache.unmanage(entry)
237             del self._entries[entry.inode]
238             with llfuse.lock_released:
239                 entry.finalize()
240             self.invalidate_inode(entry.inode)
241             entry.inode = None
242         else:
243             entry.dead = True
244             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
245
246     def invalidate_inode(self, inode):
247         llfuse.invalidate_inode(inode)
248
249     def invalidate_entry(self, inode, name):
250         llfuse.invalidate_entry(inode, name)
251
252     def clear(self):
253         self.inode_cache.clear()
254
255         for k,v in self._entries.items():
256             try:
257                 v.finalize()
258             except Exception as e:
259                 _logger.exception("Error during finalize of inode %i", k)
260
261         self._entries.clear()
262
263
264 def catch_exceptions(orig_func):
265     """Catch uncaught exceptions and log them consistently."""
266
267     @functools.wraps(orig_func)
268     def catch_exceptions_wrapper(self, *args, **kwargs):
269         try:
270             return orig_func(self, *args, **kwargs)
271         except llfuse.FUSEError:
272             raise
273         except EnvironmentError as e:
274             raise llfuse.FUSEError(e.errno)
275         except arvados.errors.KeepWriteError as e:
276             _logger.error("Keep write error: " + str(e))
277             raise llfuse.FUSEError(errno.EIO)
278         except arvados.errors.NotFoundError as e:
279             _logger.error("Block not found error: " + str(e))
280             raise llfuse.FUSEError(errno.EIO)
281         except:
282             _logger.exception("Unhandled exception during FUSE operation")
283             raise llfuse.FUSEError(errno.EIO)
284
285     return catch_exceptions_wrapper
286
287
288 class Operations(llfuse.Operations):
289     """This is the main interface with llfuse.
290
291     The methods on this object are called by llfuse threads to service FUSE
292     events to query and read from the file system.
293
294     llfuse has its own global lock which is acquired before calling a request handler,
295     so request handlers do not run concurrently unless the lock is explicitly released
296     using 'with llfuse.lock_released:'
297
298     """
299
300     def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
301         super(Operations, self).__init__()
302
303         if not inode_cache:
304             inode_cache = InodeCache(cap=256*1024*1024)
305         self.inodes = Inodes(inode_cache, encoding=encoding)
306         self.uid = uid
307         self.gid = gid
308         self.enable_write = enable_write
309
310         # dict of inode to filehandle
311         self._filehandles = {}
312         self._filehandles_counter = itertools.count(0)
313
314         # Other threads that need to wait until the fuse driver
315         # is fully initialized should wait() on this event object.
316         self.initlock = threading.Event()
317
318         self.num_retries = num_retries
319
320         self.events = None
321
322     def init(self):
323         # Allow threads that are waiting for the driver to be finished
324         # initializing to continue
325         self.initlock.set()
326
327     @catch_exceptions
328     def destroy(self):
329         if self.events:
330             self.events.close()
331             self.events = None
332
333         self.inodes.clear()
334
335     def access(self, inode, mode, ctx):
336         return True
337
338     def listen_for_events(self, api_client):
339         self.events = arvados.events.subscribe(api_client,
340                                  [["event_type", "in", ["create", "update", "delete"]]],
341                                  self.on_event)
342
343     @catch_exceptions
344     def on_event(self, ev):
345         if 'event_type' in ev:
346             with llfuse.lock:
347                 item = self.inodes.inode_cache.find(ev["object_uuid"])
348                 if item is not None:
349                     item.invalidate()
350                     if ev["object_kind"] == "arvados#collection":
351                         new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
352
353                         # new_attributes.modified_at currently lacks subsecond precision (see #6347) so use event_at which
354                         # should always be the same.
355                         #record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
356                         record_version = (ev["event_at"], new_attr["portable_data_hash"]) if new_attr else None
357
358                         item.update(to_record_version=record_version)
359                     else:
360                         item.update()
361
362                 oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
363                 olditemparent = self.inodes.inode_cache.find(oldowner)
364                 if olditemparent is not None:
365                     olditemparent.invalidate()
366                     olditemparent.update()
367
368                 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
369                 if itemparent is not None:
370                     itemparent.invalidate()
371                     itemparent.update()
372
373
374     @catch_exceptions
375     def getattr(self, inode):
376         if inode not in self.inodes:
377             raise llfuse.FUSEError(errno.ENOENT)
378
379         e = self.inodes[inode]
380
381         entry = llfuse.EntryAttributes()
382         entry.st_ino = inode
383         entry.generation = 0
384         entry.entry_timeout = 60
385         entry.attr_timeout = 60
386
387         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
388         if isinstance(e, Directory):
389             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
390         else:
391             entry.st_mode |= stat.S_IFREG
392             if isinstance(e, FuseArvadosFile):
393                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
394
395         if self.enable_write and e.writable():
396             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
397
398         entry.st_nlink = 1
399         entry.st_uid = self.uid
400         entry.st_gid = self.gid
401         entry.st_rdev = 0
402
403         entry.st_size = e.size()
404
405         entry.st_blksize = 512
406         entry.st_blocks = (entry.st_size/512)+1
407         entry.st_atime = int(e.atime())
408         entry.st_mtime = int(e.mtime())
409         entry.st_ctime = int(e.mtime())
410
411         return entry
412
413     @catch_exceptions
414     def setattr(self, inode, attr):
415         entry = self.getattr(inode)
416
417         e = self.inodes[inode]
418
419         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
420             with llfuse.lock_released:
421                 e.arvfile.truncate(attr.st_size)
422                 entry.st_size = e.arvfile.size()
423
424         return entry
425
426     @catch_exceptions
427     def lookup(self, parent_inode, name):
428         name = unicode(name, self.inodes.encoding)
429         inode = None
430
431         if name == '.':
432             inode = parent_inode
433         else:
434             if parent_inode in self.inodes:
435                 p = self.inodes[parent_inode]
436                 if name == '..':
437                     inode = p.parent_inode
438                 elif isinstance(p, Directory) and name in p:
439                     inode = p[name].inode
440
441         if inode != None:
442             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
443                       parent_inode, name, inode)
444             self.inodes[inode].inc_ref()
445             return self.getattr(inode)
446         else:
447             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
448                       parent_inode, name)
449             raise llfuse.FUSEError(errno.ENOENT)
450
451     @catch_exceptions
452     def forget(self, inodes):
453         for inode, nlookup in inodes:
454             ent = self.inodes[inode]
455             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
456             if ent.dec_ref(nlookup) == 0 and ent.dead:
457                 self.inodes.del_entry(ent)
458
459     @catch_exceptions
460     def open(self, inode, flags):
461         if inode in self.inodes:
462             p = self.inodes[inode]
463         else:
464             raise llfuse.FUSEError(errno.ENOENT)
465
466         if isinstance(p, Directory):
467             raise llfuse.FUSEError(errno.EISDIR)
468
469         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
470             raise llfuse.FUSEError(errno.EPERM)
471
472         fh = next(self._filehandles_counter)
473         self._filehandles[fh] = FileHandle(fh, p)
474         self.inodes.touch(p)
475         return fh
476
477     @catch_exceptions
478     def read(self, fh, off, size):
479         _logger.debug("arv-mount read %i %i %i", fh, off, size)
480         if fh in self._filehandles:
481             handle = self._filehandles[fh]
482         else:
483             raise llfuse.FUSEError(errno.EBADF)
484
485         self.inodes.touch(handle.obj)
486
487         return handle.obj.readfrom(off, size, self.num_retries)
488
489     @catch_exceptions
490     def write(self, fh, off, buf):
491         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
492         if fh in self._filehandles:
493             handle = self._filehandles[fh]
494         else:
495             raise llfuse.FUSEError(errno.EBADF)
496
497         if not handle.obj.writable():
498             raise llfuse.FUSEError(errno.EPERM)
499
500         self.inodes.touch(handle.obj)
501
502         return handle.obj.writeto(off, buf, self.num_retries)
503
504     @catch_exceptions
505     def release(self, fh):
506         if fh in self._filehandles:
507             try:
508                 self._filehandles[fh].flush()
509             except Exception:
510                 raise
511             finally:
512                 self._filehandles[fh].release()
513                 del self._filehandles[fh]
514         self.inodes.inode_cache.cap_cache()
515
516     def releasedir(self, fh):
517         self.release(fh)
518
519     @catch_exceptions
520     def opendir(self, inode):
521         _logger.debug("arv-mount opendir: inode %i", inode)
522
523         if inode in self.inodes:
524             p = self.inodes[inode]
525         else:
526             raise llfuse.FUSEError(errno.ENOENT)
527
528         if not isinstance(p, Directory):
529             raise llfuse.FUSEError(errno.ENOTDIR)
530
531         fh = next(self._filehandles_counter)
532         if p.parent_inode in self.inodes:
533             parent = self.inodes[p.parent_inode]
534         else:
535             raise llfuse.FUSEError(errno.EIO)
536
537         # update atime
538         self.inodes.touch(p)
539
540         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
541         return fh
542
543     @catch_exceptions
544     def readdir(self, fh, off):
545         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
546
547         if fh in self._filehandles:
548             handle = self._filehandles[fh]
549         else:
550             raise llfuse.FUSEError(errno.EBADF)
551
552         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
553
554         e = off
555         while e < len(handle.entries):
556             if handle.entries[e][1].inode in self.inodes:
557                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
558             e += 1
559
560     @catch_exceptions
561     def statfs(self):
562         st = llfuse.StatvfsData()
563         st.f_bsize = 128 * 1024
564         st.f_blocks = 0
565         st.f_files = 0
566
567         st.f_bfree = 0
568         st.f_bavail = 0
569
570         st.f_ffree = 0
571         st.f_favail = 0
572
573         st.f_frsize = 0
574         return st
575
576     def _check_writable(self, inode_parent):
577         if not self.enable_write:
578             raise llfuse.FUSEError(errno.EROFS)
579
580         if inode_parent in self.inodes:
581             p = self.inodes[inode_parent]
582         else:
583             raise llfuse.FUSEError(errno.ENOENT)
584
585         if not isinstance(p, Directory):
586             raise llfuse.FUSEError(errno.ENOTDIR)
587
588         if not p.writable():
589             raise llfuse.FUSEError(errno.EPERM)
590
591         return p
592
593     @catch_exceptions
594     def create(self, inode_parent, name, mode, flags, ctx):
595         _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
596
597         p = self._check_writable(inode_parent)
598         p.create(name)
599
600         # The file entry should have been implicitly created by callback.
601         f = p[name]
602         fh = next(self._filehandles_counter)
603         self._filehandles[fh] = FileHandle(fh, f)
604         self.inodes.touch(p)
605
606         f.inc_ref()
607         return (fh, self.getattr(f.inode))
608
609     @catch_exceptions
610     def mkdir(self, inode_parent, name, mode, ctx):
611         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
612
613         p = self._check_writable(inode_parent)
614         p.mkdir(name)
615
616         # The dir entry should have been implicitly created by callback.
617         d = p[name]
618
619         d.inc_ref()
620         return self.getattr(d.inode)
621
622     @catch_exceptions
623     def unlink(self, inode_parent, name):
624         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
625         p = self._check_writable(inode_parent)
626         p.unlink(name)
627
628     @catch_exceptions
629     def rmdir(self, inode_parent, name):
630         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
631         p = self._check_writable(inode_parent)
632         p.rmdir(name)
633
634     @catch_exceptions
635     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
636         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
637         src = self._check_writable(inode_parent_old)
638         dest = self._check_writable(inode_parent_new)
639         dest.rename(name_old, name_new, src)
640
641     @catch_exceptions
642     def flush(self, fh):
643         if fh in self._filehandles:
644             self._filehandles[fh].flush()
645
646     def fsync(self, fh, datasync):
647         self.flush(fh)
648
649     def fsyncdir(self, fh, datasync):
650         self.flush(fh)