Merge branch 'master' into github-3408-production-datamanager
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69
70 import Queue
71
72 # Default _notify_queue has a limit of 1000 items, but it really needs to be
73 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
74 # details.
75
76 llfuse.capi._notify_queue = Queue.Queue()
77
78 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
79 from fusefile import StringFile, FuseArvadosFile
80
81 _logger = logging.getLogger('arvados.arvados_fuse')
82
83 # Uncomment this to enable llfuse debug logging.
84 # log_handler = logging.StreamHandler()
85 # llogger = logging.getLogger('llfuse')
86 # llogger.addHandler(log_handler)
87 # llogger.setLevel(logging.DEBUG)
88
89 class Handle(object):
90     """Connects a numeric file handle to a File or Directory object that has
91     been opened by the client."""
92
93     def __init__(self, fh, obj):
94         self.fh = fh
95         self.obj = obj
96         self.obj.inc_use()
97
98     def release(self):
99         self.obj.dec_use()
100
101     def flush(self):
102         return self.obj.flush()
103
104
105 class FileHandle(Handle):
106     """Connects a numeric file handle to a File  object that has
107     been opened by the client."""
108     pass
109
110
111 class DirectoryHandle(Handle):
112     """Connects a numeric file handle to a Directory object that has
113     been opened by the client."""
114
115     def __init__(self, fh, dirobj, entries):
116         super(DirectoryHandle, self).__init__(fh, dirobj)
117         self.entries = entries
118
119
120 class InodeCache(object):
121     """Records the memory footprint of objects and when they are last used.
122
123     When the cache limit is exceeded, the least recently used objects are
124     cleared.  Clearing the object means discarding its contents to release
125     memory.  The next time the object is accessed, it must be re-fetched from
126     the server.  Note that the inode cache limit is a soft limit; the cache
127     limit may be exceeded if necessary to load very large objects, it may also
128     be exceeded if open file handles prevent objects from being cleared.
129
130     """
131
132     def __init__(self, cap, min_entries=4):
133         self._entries = collections.OrderedDict()
134         self._by_uuid = {}
135         self._counter = itertools.count(0)
136         self.cap = cap
137         self._total = 0
138         self.min_entries = min_entries
139
140     def total(self):
141         return self._total
142
143     def _remove(self, obj, clear):
144         if clear and not obj.clear():
145             _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
146             return False
147         self._total -= obj.cache_size
148         del self._entries[obj.cache_priority]
149         if obj.cache_uuid:
150             del self._by_uuid[obj.cache_uuid]
151             obj.cache_uuid = None
152         if clear:
153             _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
154         return True
155
156     def cap_cache(self):
157         if self._total > self.cap:
158             for key in list(self._entries.keys()):
159                 if self._total < self.cap or len(self._entries) < self.min_entries:
160                     break
161                 self._remove(self._entries[key], True)
162
163     def manage(self, obj):
164         if obj.persisted():
165             obj.cache_priority = next(self._counter)
166             obj.cache_size = obj.objsize()
167             self._entries[obj.cache_priority] = obj
168             obj.cache_uuid = obj.uuid()
169             if obj.cache_uuid:
170                 self._by_uuid[obj.cache_uuid] = obj
171             self._total += obj.objsize()
172             _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
173             self.cap_cache()
174         else:
175             obj.cache_priority = None
176
177     def touch(self, obj):
178         if obj.persisted():
179             if obj.cache_priority in self._entries:
180                 self._remove(obj, False)
181             self.manage(obj)
182
183     def unmanage(self, obj):
184         if obj.persisted() and obj.cache_priority in self._entries:
185             self._remove(obj, True)
186
187     def find(self, uuid):
188         return self._by_uuid.get(uuid)
189
190 class Inodes(object):
191     """Manage the set of inodes.  This is the mapping from a numeric id
192     to a concrete File or Directory object"""
193
194     def __init__(self, inode_cache, encoding="utf-8"):
195         self._entries = {}
196         self._counter = itertools.count(llfuse.ROOT_INODE)
197         self.inode_cache = inode_cache
198         self.encoding = encoding
199         self.deferred_invalidations = []
200
201     def __getitem__(self, item):
202         return self._entries[item]
203
204     def __setitem__(self, key, item):
205         self._entries[key] = item
206
207     def __iter__(self):
208         return self._entries.iterkeys()
209
210     def items(self):
211         return self._entries.items()
212
213     def __contains__(self, k):
214         return k in self._entries
215
216     def touch(self, entry):
217         entry._atime = time.time()
218         self.inode_cache.touch(entry)
219
220     def add_entry(self, entry):
221         entry.inode = next(self._counter)
222         if entry.inode == llfuse.ROOT_INODE:
223             entry.inc_ref()
224         self._entries[entry.inode] = entry
225         self.inode_cache.manage(entry)
226         return entry
227
228     def del_entry(self, entry):
229         if entry.ref_count == 0:
230             self.inode_cache.unmanage(entry)
231             del self._entries[entry.inode]
232             with llfuse.lock_released:
233                 entry.finalize()
234             self.invalidate_inode(entry.inode)
235             entry.inode = None
236         else:
237             entry.dead = True
238             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
239
240     def invalidate_inode(self, inode):
241         llfuse.invalidate_inode(inode)
242
243     def invalidate_entry(self, inode, name):
244         llfuse.invalidate_entry(inode, name)
245
246
247 def catch_exceptions(orig_func):
248     """Catch uncaught exceptions and log them consistently."""
249
250     @functools.wraps(orig_func)
251     def catch_exceptions_wrapper(self, *args, **kwargs):
252         try:
253             return orig_func(self, *args, **kwargs)
254         except llfuse.FUSEError:
255             raise
256         except EnvironmentError as e:
257             raise llfuse.FUSEError(e.errno)
258         except arvados.errors.KeepWriteError as e:
259             _logger.error("Keep write error: " + str(e))
260             raise llfuse.FUSEError(errno.EIO)
261         except arvados.errors.NotFoundError as e:
262             _logger.error("Block not found error: " + str(e))
263             raise llfuse.FUSEError(errno.EIO)
264         except:
265             _logger.exception("Unhandled exception during FUSE operation")
266             raise llfuse.FUSEError(errno.EIO)
267
268     return catch_exceptions_wrapper
269
270
271 class Operations(llfuse.Operations):
272     """This is the main interface with llfuse.
273
274     The methods on this object are called by llfuse threads to service FUSE
275     events to query and read from the file system.
276
277     llfuse has its own global lock which is acquired before calling a request handler,
278     so request handlers do not run concurrently unless the lock is explicitly released
279     using 'with llfuse.lock_released:'
280
281     """
282
283     def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
284         super(Operations, self).__init__()
285
286         if not inode_cache:
287             inode_cache = InodeCache(cap=256*1024*1024)
288         self.inodes = Inodes(inode_cache, encoding=encoding)
289         self.uid = uid
290         self.gid = gid
291         self.enable_write = enable_write
292
293         # dict of inode to filehandle
294         self._filehandles = {}
295         self._filehandles_counter = itertools.count(0)
296
297         # Other threads that need to wait until the fuse driver
298         # is fully initialized should wait() on this event object.
299         self.initlock = threading.Event()
300
301         self.num_retries = num_retries
302
303         self.events = None
304
305     def init(self):
306         # Allow threads that are waiting for the driver to be finished
307         # initializing to continue
308         self.initlock.set()
309
310     @catch_exceptions
311     def destroy(self):
312         if self.events:
313             self.events.close()
314             self.events = None
315
316         for k,v in self.inodes.items():
317             try:
318                 v.finalize()
319             except Exception as e:
320                 _logger.exception("Error during finalize of inode %i", k)
321         self.inodes = None
322
323     def access(self, inode, mode, ctx):
324         return True
325
326     def listen_for_events(self, api_client):
327         self.events = arvados.events.subscribe(api_client,
328                                  [["event_type", "in", ["create", "update", "delete"]]],
329                                  self.on_event)
330
331     @catch_exceptions
332     def on_event(self, ev):
333         if 'event_type' in ev:
334             with llfuse.lock:
335                 item = self.inodes.inode_cache.find(ev["object_uuid"])
336                 if item is not None:
337                     item.invalidate()
338                     if ev["object_kind"] == "arvados#collection":
339                         new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
340
341                         # new_attributes.modified_at currently lacks subsecond precision (see #6347) so use event_at which
342                         # should always be the same.
343                         #record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
344                         record_version = (ev["event_at"], new_attr["portable_data_hash"]) if new_attr else None
345
346                         item.update(to_record_version=record_version)
347                     else:
348                         item.update()
349
350                 oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
351                 olditemparent = self.inodes.inode_cache.find(oldowner)
352                 if olditemparent is not None:
353                     olditemparent.invalidate()
354                     olditemparent.update()
355
356                 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
357                 if itemparent is not None:
358                     itemparent.invalidate()
359                     itemparent.update()
360
361
362     @catch_exceptions
363     def getattr(self, inode):
364         if inode not in self.inodes:
365             raise llfuse.FUSEError(errno.ENOENT)
366
367         e = self.inodes[inode]
368
369         entry = llfuse.EntryAttributes()
370         entry.st_ino = inode
371         entry.generation = 0
372         entry.entry_timeout = 60
373         entry.attr_timeout = 60
374
375         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
376         if isinstance(e, Directory):
377             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
378         else:
379             entry.st_mode |= stat.S_IFREG
380             if isinstance(e, FuseArvadosFile):
381                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
382
383         if self.enable_write and e.writable():
384             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
385
386         entry.st_nlink = 1
387         entry.st_uid = self.uid
388         entry.st_gid = self.gid
389         entry.st_rdev = 0
390
391         entry.st_size = e.size()
392
393         entry.st_blksize = 512
394         entry.st_blocks = (entry.st_size/512)+1
395         entry.st_atime = int(e.atime())
396         entry.st_mtime = int(e.mtime())
397         entry.st_ctime = int(e.mtime())
398
399         return entry
400
401     @catch_exceptions
402     def setattr(self, inode, attr):
403         entry = self.getattr(inode)
404
405         e = self.inodes[inode]
406
407         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
408             with llfuse.lock_released:
409                 e.arvfile.truncate(attr.st_size)
410                 entry.st_size = e.arvfile.size()
411
412         return entry
413
414     @catch_exceptions
415     def lookup(self, parent_inode, name):
416         name = unicode(name, self.inodes.encoding)
417         inode = None
418
419         if name == '.':
420             inode = parent_inode
421         else:
422             if parent_inode in self.inodes:
423                 p = self.inodes[parent_inode]
424                 if name == '..':
425                     inode = p.parent_inode
426                 elif isinstance(p, Directory) and name in p:
427                     inode = p[name].inode
428
429         if inode != None:
430             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
431                       parent_inode, name, inode)
432             self.inodes[inode].inc_ref()
433             return self.getattr(inode)
434         else:
435             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
436                       parent_inode, name)
437             raise llfuse.FUSEError(errno.ENOENT)
438
439     @catch_exceptions
440     def forget(self, inodes):
441         for inode, nlookup in inodes:
442             ent = self.inodes[inode]
443             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
444             if ent.dec_ref(nlookup) == 0 and ent.dead:
445                 self.inodes.del_entry(ent)
446
447     @catch_exceptions
448     def open(self, inode, flags):
449         if inode in self.inodes:
450             p = self.inodes[inode]
451         else:
452             raise llfuse.FUSEError(errno.ENOENT)
453
454         if isinstance(p, Directory):
455             raise llfuse.FUSEError(errno.EISDIR)
456
457         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
458             raise llfuse.FUSEError(errno.EPERM)
459
460         fh = next(self._filehandles_counter)
461         self._filehandles[fh] = FileHandle(fh, p)
462         self.inodes.touch(p)
463         return fh
464
465     @catch_exceptions
466     def read(self, fh, off, size):
467         _logger.debug("arv-mount read %i %i %i", fh, off, size)
468         if fh in self._filehandles:
469             handle = self._filehandles[fh]
470         else:
471             raise llfuse.FUSEError(errno.EBADF)
472
473         self.inodes.touch(handle.obj)
474
475         return handle.obj.readfrom(off, size, self.num_retries)
476
477     @catch_exceptions
478     def write(self, fh, off, buf):
479         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
480         if fh in self._filehandles:
481             handle = self._filehandles[fh]
482         else:
483             raise llfuse.FUSEError(errno.EBADF)
484
485         if not handle.obj.writable():
486             raise llfuse.FUSEError(errno.EPERM)
487
488         self.inodes.touch(handle.obj)
489
490         return handle.obj.writeto(off, buf, self.num_retries)
491
492     @catch_exceptions
493     def release(self, fh):
494         if fh in self._filehandles:
495             try:
496                 self._filehandles[fh].flush()
497             except Exception:
498                 raise
499             finally:
500                 self._filehandles[fh].release()
501                 del self._filehandles[fh]
502         self.inodes.inode_cache.cap_cache()
503
504     def releasedir(self, fh):
505         self.release(fh)
506
507     @catch_exceptions
508     def opendir(self, inode):
509         _logger.debug("arv-mount opendir: inode %i", inode)
510
511         if inode in self.inodes:
512             p = self.inodes[inode]
513         else:
514             raise llfuse.FUSEError(errno.ENOENT)
515
516         if not isinstance(p, Directory):
517             raise llfuse.FUSEError(errno.ENOTDIR)
518
519         fh = next(self._filehandles_counter)
520         if p.parent_inode in self.inodes:
521             parent = self.inodes[p.parent_inode]
522         else:
523             raise llfuse.FUSEError(errno.EIO)
524
525         # update atime
526         self.inodes.touch(p)
527
528         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
529         return fh
530
531     @catch_exceptions
532     def readdir(self, fh, off):
533         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
534
535         if fh in self._filehandles:
536             handle = self._filehandles[fh]
537         else:
538             raise llfuse.FUSEError(errno.EBADF)
539
540         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
541
542         e = off
543         while e < len(handle.entries):
544             if handle.entries[e][1].inode in self.inodes:
545                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
546             e += 1
547
548     @catch_exceptions
549     def statfs(self):
550         st = llfuse.StatvfsData()
551         st.f_bsize = 128 * 1024
552         st.f_blocks = 0
553         st.f_files = 0
554
555         st.f_bfree = 0
556         st.f_bavail = 0
557
558         st.f_ffree = 0
559         st.f_favail = 0
560
561         st.f_frsize = 0
562         return st
563
564     def _check_writable(self, inode_parent):
565         if not self.enable_write:
566             raise llfuse.FUSEError(errno.EROFS)
567
568         if inode_parent in self.inodes:
569             p = self.inodes[inode_parent]
570         else:
571             raise llfuse.FUSEError(errno.ENOENT)
572
573         if not isinstance(p, Directory):
574             raise llfuse.FUSEError(errno.ENOTDIR)
575
576         if not p.writable():
577             raise llfuse.FUSEError(errno.EPERM)
578
579         return p
580
581     @catch_exceptions
582     def create(self, inode_parent, name, mode, flags, ctx):
583         _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
584
585         p = self._check_writable(inode_parent)
586         p.create(name)
587
588         # The file entry should have been implicitly created by callback.
589         f = p[name]
590         fh = next(self._filehandles_counter)
591         self._filehandles[fh] = FileHandle(fh, f)
592         self.inodes.touch(p)
593
594         f.inc_ref()
595         return (fh, self.getattr(f.inode))
596
597     @catch_exceptions
598     def mkdir(self, inode_parent, name, mode, ctx):
599         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
600
601         p = self._check_writable(inode_parent)
602         p.mkdir(name)
603
604         # The dir entry should have been implicitly created by callback.
605         d = p[name]
606
607         d.inc_ref()
608         return self.getattr(d.inode)
609
610     @catch_exceptions
611     def unlink(self, inode_parent, name):
612         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
613         p = self._check_writable(inode_parent)
614         p.unlink(name)
615
616     @catch_exceptions
617     def rmdir(self, inode_parent, name):
618         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
619         p = self._check_writable(inode_parent)
620         p.rmdir(name)
621
622     @catch_exceptions
623     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
624         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
625         src = self._check_writable(inode_parent_old)
626         dest = self._check_writable(inode_parent_new)
627         dest.rename(name_old, name_new, src)
628
629     @catch_exceptions
630     def flush(self, fh):
631         if fh in self._filehandles:
632             self._filehandles[fh].flush()
633
634     def fsync(self, fh, datasync):
635         self.flush(fh)
636
637     def fsyncdir(self, fh, datasync):
638         self.flush(fh)