fc7aace9194b44781fb11680ad6d3b954f588bd2
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69 import arvados.keep
70
71 import Queue
72
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
75 # details.
76
77 llfuse.capi._notify_queue = Queue.Queue()
78
79 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
80 from fusefile import StringFile, FuseArvadosFile
81
82 _logger = logging.getLogger('arvados.arvados_fuse')
83
84 # Uncomment this to enable llfuse debug logging.
85 # log_handler = logging.StreamHandler()
86 # llogger = logging.getLogger('llfuse')
87 # llogger.addHandler(log_handler)
88 # llogger.setLevel(logging.DEBUG)
89
90 class Handle(object):
91     """Connects a numeric file handle to a File or Directory object that has
92     been opened by the client."""
93
94     def __init__(self, fh, obj):
95         self.fh = fh
96         self.obj = obj
97         self.obj.inc_use()
98
99     def release(self):
100         self.obj.dec_use()
101
102     def flush(self):
103         if self.obj.writable():
104             return self.obj.flush()
105
106
107 class FileHandle(Handle):
108     """Connects a numeric file handle to a File  object that has
109     been opened by the client."""
110     pass
111
112
113 class DirectoryHandle(Handle):
114     """Connects a numeric file handle to a Directory object that has
115     been opened by the client."""
116
117     def __init__(self, fh, dirobj, entries):
118         super(DirectoryHandle, self).__init__(fh, dirobj)
119         self.entries = entries
120
121
122 class InodeCache(object):
123     """Records the memory footprint of objects and when they are last used.
124
125     When the cache limit is exceeded, the least recently used objects are
126     cleared.  Clearing the object means discarding its contents to release
127     memory.  The next time the object is accessed, it must be re-fetched from
128     the server.  Note that the inode cache limit is a soft limit; the cache
129     limit may be exceeded if necessary to load very large objects, it may also
130     be exceeded if open file handles prevent objects from being cleared.
131
132     """
133
134     def __init__(self, cap, min_entries=4):
135         self._entries = collections.OrderedDict()
136         self._by_uuid = {}
137         self._counter = itertools.count(0)
138         self.cap = cap
139         self._total = 0
140         self.min_entries = min_entries
141
142     def total(self):
143         return self._total
144
145     def _remove(self, obj, clear):
146         if clear and not obj.clear():
147             _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
148             return False
149         self._total -= obj.cache_size
150         del self._entries[obj.cache_priority]
151         if obj.cache_uuid:
152             self._by_uuid[obj.cache_uuid].remove(obj)
153             if not self._by_uuid[obj.cache_uuid]:
154                 del self._by_uuid[obj.cache_uuid]
155             obj.cache_uuid = None
156         if clear:
157             _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
158         return True
159
160     def cap_cache(self):
161         if self._total > self.cap:
162             for key in list(self._entries.keys()):
163                 if self._total < self.cap or len(self._entries) < self.min_entries:
164                     break
165                 self._remove(self._entries[key], True)
166
167     def manage(self, obj):
168         if obj.persisted():
169             obj.cache_priority = next(self._counter)
170             obj.cache_size = obj.objsize()
171             self._entries[obj.cache_priority] = obj
172             obj.cache_uuid = obj.uuid()
173             if obj.cache_uuid:
174                 if obj.cache_uuid not in self._by_uuid:
175                     self._by_uuid[obj.cache_uuid] = [obj]
176                 else:
177                     if obj not in self._by_uuid[obj.cache_uuid]:
178                         self._by_uuid[obj.cache_uuid].append(obj)
179             self._total += obj.objsize()
180             _logger.debug("InodeCache touched %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
181             self.cap_cache()
182         else:
183             obj.cache_priority = None
184
185     def touch(self, obj):
186         if obj.persisted():
187             if obj.cache_priority in self._entries:
188                 self._remove(obj, False)
189             self.manage(obj)
190
191     def unmanage(self, obj):
192         if obj.persisted() and obj.cache_priority in self._entries:
193             self._remove(obj, True)
194
195     def find(self, uuid):
196         return self._by_uuid.get(uuid)
197
198     def clear(self):
199         self._entries.clear()
200         self._by_uuid.clear()
201         self._total = 0
202
203 class Inodes(object):
204     """Manage the set of inodes.  This is the mapping from a numeric id
205     to a concrete File or Directory object"""
206
207     def __init__(self, inode_cache, encoding="utf-8"):
208         self._entries = {}
209         self._counter = itertools.count(llfuse.ROOT_INODE)
210         self.inode_cache = inode_cache
211         self.encoding = encoding
212         self.deferred_invalidations = []
213
214     def __getitem__(self, item):
215         return self._entries[item]
216
217     def __setitem__(self, key, item):
218         self._entries[key] = item
219
220     def __iter__(self):
221         return self._entries.iterkeys()
222
223     def items(self):
224         return self._entries.items()
225
226     def __contains__(self, k):
227         return k in self._entries
228
229     def touch(self, entry):
230         entry._atime = time.time()
231         self.inode_cache.touch(entry)
232
233     def add_entry(self, entry):
234         entry.inode = next(self._counter)
235         if entry.inode == llfuse.ROOT_INODE:
236             entry.inc_ref()
237         self._entries[entry.inode] = entry
238         self.inode_cache.manage(entry)
239         return entry
240
241     def del_entry(self, entry):
242         if entry.ref_count == 0:
243             self.inode_cache.unmanage(entry)
244             del self._entries[entry.inode]
245             with llfuse.lock_released:
246                 entry.finalize()
247             self.invalidate_inode(entry.inode)
248             entry.inode = None
249         else:
250             entry.dead = True
251             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
252
253     def invalidate_inode(self, inode):
254         llfuse.invalidate_inode(inode)
255
256     def invalidate_entry(self, inode, name):
257         llfuse.invalidate_entry(inode, name)
258
259     def clear(self):
260         self.inode_cache.clear()
261
262         for k,v in self._entries.items():
263             try:
264                 v.finalize()
265             except Exception as e:
266                 _logger.exception("Error during finalize of inode %i", k)
267
268         self._entries.clear()
269
270
271 def catch_exceptions(orig_func):
272     """Catch uncaught exceptions and log them consistently."""
273
274     @functools.wraps(orig_func)
275     def catch_exceptions_wrapper(self, *args, **kwargs):
276         try:
277             return orig_func(self, *args, **kwargs)
278         except llfuse.FUSEError:
279             raise
280         except EnvironmentError as e:
281             raise llfuse.FUSEError(e.errno)
282         except arvados.errors.KeepWriteError as e:
283             _logger.error("Keep write error: " + str(e))
284             raise llfuse.FUSEError(errno.EIO)
285         except arvados.errors.NotFoundError as e:
286             _logger.error("Block not found error: " + str(e))
287             raise llfuse.FUSEError(errno.EIO)
288         except:
289             _logger.exception("Unhandled exception during FUSE operation")
290             raise llfuse.FUSEError(errno.EIO)
291
292     return catch_exceptions_wrapper
293
294
295 class Operations(llfuse.Operations):
296     """This is the main interface with llfuse.
297
298     The methods on this object are called by llfuse threads to service FUSE
299     events to query and read from the file system.
300
301     llfuse has its own global lock which is acquired before calling a request handler,
302     so request handlers do not run concurrently unless the lock is explicitly released
303     using 'with llfuse.lock_released:'
304
305     """
306
307     def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
308         super(Operations, self).__init__()
309
310         if not inode_cache:
311             inode_cache = InodeCache(cap=256*1024*1024)
312         self.inodes = Inodes(inode_cache, encoding=encoding)
313         self.uid = uid
314         self.gid = gid
315         self.enable_write = enable_write
316
317         # dict of inode to filehandle
318         self._filehandles = {}
319         self._filehandles_counter = itertools.count(0)
320
321         # Other threads that need to wait until the fuse driver
322         # is fully initialized should wait() on this event object.
323         self.initlock = threading.Event()
324
325         self.num_retries = num_retries
326
327         self.read_counter = arvados.keep.Counter()
328         self.write_counter = arvados.keep.Counter()
329
330         self.events = None
331
332     def init(self):
333         # Allow threads that are waiting for the driver to be finished
334         # initializing to continue
335         self.initlock.set()
336
337     @catch_exceptions
338     def destroy(self):
339         if self.events:
340             self.events.close()
341             self.events = None
342
343         self.inodes.clear()
344
345     def access(self, inode, mode, ctx):
346         return True
347
348     def listen_for_events(self, api_client):
349         self.events = arvados.events.subscribe(api_client,
350                                  [["event_type", "in", ["create", "update", "delete"]]],
351                                  self.on_event)
352
353     @catch_exceptions
354     def on_event(self, ev):
355         if 'event_type' in ev:
356             with llfuse.lock:
357                 items = self.inodes.inode_cache.find(ev["object_uuid"])
358                 if items is not None:
359                     for item in items:
360                         item.invalidate()
361                         if ev["object_kind"] == "arvados#collection":
362                             new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
363
364                             # new_attributes.modified_at currently lacks subsecond precision (see #6347) so use event_at which
365                             # should always be the same.
366                             #record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
367                             record_version = (ev["event_at"], new_attr["portable_data_hash"]) if new_attr else None
368
369                             item.update(to_record_version=record_version)
370                         else:
371                             item.update()
372
373                 oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
374                 olditemparent = self.inodes.inode_cache.find(oldowner)
375                 if olditemparent is not None:
376                     olditemparent.invalidate()
377                     olditemparent.update()
378
379                 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
380                 if itemparent is not None:
381                     itemparent.invalidate()
382                     itemparent.update()
383
384
385     @catch_exceptions
386     def getattr(self, inode):
387         if inode not in self.inodes:
388             raise llfuse.FUSEError(errno.ENOENT)
389
390         e = self.inodes[inode]
391
392         entry = llfuse.EntryAttributes()
393         entry.st_ino = inode
394         entry.generation = 0
395         entry.entry_timeout = 60
396         entry.attr_timeout = 60
397
398         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
399         if isinstance(e, Directory):
400             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
401         else:
402             entry.st_mode |= stat.S_IFREG
403             if isinstance(e, FuseArvadosFile):
404                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
405
406         if self.enable_write and e.writable():
407             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
408
409         entry.st_nlink = 1
410         entry.st_uid = self.uid
411         entry.st_gid = self.gid
412         entry.st_rdev = 0
413
414         entry.st_size = e.size()
415
416         entry.st_blksize = 512
417         entry.st_blocks = (entry.st_size/512)+1
418         entry.st_atime = int(e.atime())
419         entry.st_mtime = int(e.mtime())
420         entry.st_ctime = int(e.mtime())
421
422         return entry
423
424     @catch_exceptions
425     def setattr(self, inode, attr):
426         entry = self.getattr(inode)
427
428         e = self.inodes[inode]
429
430         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
431             with llfuse.lock_released:
432                 e.arvfile.truncate(attr.st_size)
433                 entry.st_size = e.arvfile.size()
434
435         return entry
436
437     @catch_exceptions
438     def lookup(self, parent_inode, name):
439         name = unicode(name, self.inodes.encoding)
440         inode = None
441
442         if name == '.':
443             inode = parent_inode
444         else:
445             if parent_inode in self.inodes:
446                 p = self.inodes[parent_inode]
447                 if name == '..':
448                     inode = p.parent_inode
449                 elif isinstance(p, Directory) and name in p:
450                     inode = p[name].inode
451
452         if inode != None:
453             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
454                       parent_inode, name, inode)
455             self.inodes[inode].inc_ref()
456             return self.getattr(inode)
457         else:
458             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
459                       parent_inode, name)
460             raise llfuse.FUSEError(errno.ENOENT)
461
462     @catch_exceptions
463     def forget(self, inodes):
464         for inode, nlookup in inodes:
465             ent = self.inodes[inode]
466             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
467             if ent.dec_ref(nlookup) == 0 and ent.dead:
468                 self.inodes.del_entry(ent)
469
470     @catch_exceptions
471     def open(self, inode, flags):
472         if inode in self.inodes:
473             p = self.inodes[inode]
474         else:
475             raise llfuse.FUSEError(errno.ENOENT)
476
477         if isinstance(p, Directory):
478             raise llfuse.FUSEError(errno.EISDIR)
479
480         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
481             raise llfuse.FUSEError(errno.EPERM)
482
483         fh = next(self._filehandles_counter)
484         self._filehandles[fh] = FileHandle(fh, p)
485         self.inodes.touch(p)
486         return fh
487
488     @catch_exceptions
489     def read(self, fh, off, size):
490         _logger.debug("arv-mount read %i %i %i", fh, off, size)
491         if fh in self._filehandles:
492             handle = self._filehandles[fh]
493         else:
494             raise llfuse.FUSEError(errno.EBADF)
495
496         self.inodes.touch(handle.obj)
497
498         r = handle.obj.readfrom(off, size, self.num_retries)
499         if r:
500             self.read_counter.add(len(r))
501         return r
502
503     @catch_exceptions
504     def write(self, fh, off, buf):
505         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
506         if fh in self._filehandles:
507             handle = self._filehandles[fh]
508         else:
509             raise llfuse.FUSEError(errno.EBADF)
510
511         if not handle.obj.writable():
512             raise llfuse.FUSEError(errno.EPERM)
513
514         self.inodes.touch(handle.obj)
515
516         w = handle.obj.writeto(off, buf, self.num_retries)
517         if w:
518             self.write_counter.add(w)
519         return w
520
521     @catch_exceptions
522     def release(self, fh):
523         if fh in self._filehandles:
524             try:
525                 self._filehandles[fh].flush()
526             except Exception:
527                 raise
528             finally:
529                 self._filehandles[fh].release()
530                 del self._filehandles[fh]
531         self.inodes.inode_cache.cap_cache()
532
533     def releasedir(self, fh):
534         self.release(fh)
535
536     @catch_exceptions
537     def opendir(self, inode):
538         _logger.debug("arv-mount opendir: inode %i", inode)
539
540         if inode in self.inodes:
541             p = self.inodes[inode]
542         else:
543             raise llfuse.FUSEError(errno.ENOENT)
544
545         if not isinstance(p, Directory):
546             raise llfuse.FUSEError(errno.ENOTDIR)
547
548         fh = next(self._filehandles_counter)
549         if p.parent_inode in self.inodes:
550             parent = self.inodes[p.parent_inode]
551         else:
552             raise llfuse.FUSEError(errno.EIO)
553
554         # update atime
555         self.inodes.touch(p)
556
557         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
558         return fh
559
560     @catch_exceptions
561     def readdir(self, fh, off):
562         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
563
564         if fh in self._filehandles:
565             handle = self._filehandles[fh]
566         else:
567             raise llfuse.FUSEError(errno.EBADF)
568
569         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
570
571         e = off
572         while e < len(handle.entries):
573             if handle.entries[e][1].inode in self.inodes:
574                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
575             e += 1
576
577     @catch_exceptions
578     def statfs(self):
579         st = llfuse.StatvfsData()
580         st.f_bsize = 128 * 1024
581         st.f_blocks = 0
582         st.f_files = 0
583
584         st.f_bfree = 0
585         st.f_bavail = 0
586
587         st.f_ffree = 0
588         st.f_favail = 0
589
590         st.f_frsize = 0
591         return st
592
593     def _check_writable(self, inode_parent):
594         if not self.enable_write:
595             raise llfuse.FUSEError(errno.EROFS)
596
597         if inode_parent in self.inodes:
598             p = self.inodes[inode_parent]
599         else:
600             raise llfuse.FUSEError(errno.ENOENT)
601
602         if not isinstance(p, Directory):
603             raise llfuse.FUSEError(errno.ENOTDIR)
604
605         if not p.writable():
606             raise llfuse.FUSEError(errno.EPERM)
607
608         return p
609
610     @catch_exceptions
611     def create(self, inode_parent, name, mode, flags, ctx):
612         _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
613
614         p = self._check_writable(inode_parent)
615         p.create(name)
616
617         # The file entry should have been implicitly created by callback.
618         f = p[name]
619         fh = next(self._filehandles_counter)
620         self._filehandles[fh] = FileHandle(fh, f)
621         self.inodes.touch(p)
622
623         f.inc_ref()
624         return (fh, self.getattr(f.inode))
625
626     @catch_exceptions
627     def mkdir(self, inode_parent, name, mode, ctx):
628         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
629
630         p = self._check_writable(inode_parent)
631         p.mkdir(name)
632
633         # The dir entry should have been implicitly created by callback.
634         d = p[name]
635
636         d.inc_ref()
637         return self.getattr(d.inode)
638
639     @catch_exceptions
640     def unlink(self, inode_parent, name):
641         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
642         p = self._check_writable(inode_parent)
643         p.unlink(name)
644
645     @catch_exceptions
646     def rmdir(self, inode_parent, name):
647         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
648         p = self._check_writable(inode_parent)
649         p.rmdir(name)
650
651     @catch_exceptions
652     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
653         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
654         src = self._check_writable(inode_parent_old)
655         dest = self._check_writable(inode_parent_new)
656         dest.rename(name_old, name_new, src)
657
658     @catch_exceptions
659     def flush(self, fh):
660         if fh in self._filehandles:
661             self._filehandles[fh].flush()
662
663     def fsync(self, fh, datasync):
664         self.flush(fh)
665
666     def fsyncdir(self, fh, datasync):
667         self.flush(fh)