Merge branch '5288-arv-copy-documentation' refs #5288
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69
70 import Queue
71
72 # Default _notify_queue has a limit of 1000 items, but it really needs to be
73 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
74 # details.
75
76 llfuse.capi._notify_queue = Queue.Queue()
77
78 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
79 from fusefile import StringFile, FuseArvadosFile
80
81 _logger = logging.getLogger('arvados.arvados_fuse')
82
83 # Uncomment this to enable llfuse debug logging.
84 # log_handler = logging.StreamHandler()
85 # llogger = logging.getLogger('llfuse')
86 # llogger.addHandler(log_handler)
87 # llogger.setLevel(logging.DEBUG)
88
89 class Handle(object):
90     """Connects a numeric file handle to a File or Directory object that has
91     been opened by the client."""
92
93     def __init__(self, fh, obj):
94         self.fh = fh
95         self.obj = obj
96         self.obj.inc_use()
97
98     def release(self):
99         self.obj.dec_use()
100
101     def flush(self):
102         if self.obj.writable():
103             return self.obj.flush()
104
105
106 class FileHandle(Handle):
107     """Connects a numeric file handle to a File  object that has
108     been opened by the client."""
109     pass
110
111
112 class DirectoryHandle(Handle):
113     """Connects a numeric file handle to a Directory object that has
114     been opened by the client."""
115
116     def __init__(self, fh, dirobj, entries):
117         super(DirectoryHandle, self).__init__(fh, dirobj)
118         self.entries = entries
119
120
121 class InodeCache(object):
122     """Records the memory footprint of objects and when they are last used.
123
124     When the cache limit is exceeded, the least recently used objects are
125     cleared.  Clearing the object means discarding its contents to release
126     memory.  The next time the object is accessed, it must be re-fetched from
127     the server.  Note that the inode cache limit is a soft limit; the cache
128     limit may be exceeded if necessary to load very large objects, it may also
129     be exceeded if open file handles prevent objects from being cleared.
130
131     """
132
133     def __init__(self, cap, min_entries=4):
134         self._entries = collections.OrderedDict()
135         self._by_uuid = {}
136         self._counter = itertools.count(0)
137         self.cap = cap
138         self._total = 0
139         self.min_entries = min_entries
140
141     def total(self):
142         return self._total
143
144     def _remove(self, obj, clear):
145         if clear and not obj.clear():
146             _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
147             return False
148         self._total -= obj.cache_size
149         del self._entries[obj.cache_priority]
150         if obj.cache_uuid:
151             del self._by_uuid[obj.cache_uuid]
152             obj.cache_uuid = None
153         if clear:
154             _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
155         return True
156
157     def cap_cache(self):
158         if self._total > self.cap:
159             for key in list(self._entries.keys()):
160                 if self._total < self.cap or len(self._entries) < self.min_entries:
161                     break
162                 self._remove(self._entries[key], True)
163
164     def manage(self, obj):
165         if obj.persisted():
166             obj.cache_priority = next(self._counter)
167             obj.cache_size = obj.objsize()
168             self._entries[obj.cache_priority] = obj
169             obj.cache_uuid = obj.uuid()
170             if obj.cache_uuid:
171                 self._by_uuid[obj.cache_uuid] = obj
172             self._total += obj.objsize()
173             _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
174             self.cap_cache()
175         else:
176             obj.cache_priority = None
177
178     def touch(self, obj):
179         if obj.persisted():
180             if obj.cache_priority in self._entries:
181                 self._remove(obj, False)
182             self.manage(obj)
183
184     def unmanage(self, obj):
185         if obj.persisted() and obj.cache_priority in self._entries:
186             self._remove(obj, True)
187
188     def find(self, uuid):
189         return self._by_uuid.get(uuid)
190
191 class Inodes(object):
192     """Manage the set of inodes.  This is the mapping from a numeric id
193     to a concrete File or Directory object"""
194
195     def __init__(self, inode_cache, encoding="utf-8"):
196         self._entries = {}
197         self._counter = itertools.count(llfuse.ROOT_INODE)
198         self.inode_cache = inode_cache
199         self.encoding = encoding
200         self.deferred_invalidations = []
201
202     def __getitem__(self, item):
203         return self._entries[item]
204
205     def __setitem__(self, key, item):
206         self._entries[key] = item
207
208     def __iter__(self):
209         return self._entries.iterkeys()
210
211     def items(self):
212         return self._entries.items()
213
214     def __contains__(self, k):
215         return k in self._entries
216
217     def touch(self, entry):
218         entry._atime = time.time()
219         self.inode_cache.touch(entry)
220
221     def add_entry(self, entry):
222         entry.inode = next(self._counter)
223         if entry.inode == llfuse.ROOT_INODE:
224             entry.inc_ref()
225         self._entries[entry.inode] = entry
226         self.inode_cache.manage(entry)
227         return entry
228
229     def del_entry(self, entry):
230         if entry.ref_count == 0:
231             self.inode_cache.unmanage(entry)
232             del self._entries[entry.inode]
233             with llfuse.lock_released:
234                 entry.finalize()
235             self.invalidate_inode(entry.inode)
236             entry.inode = None
237         else:
238             entry.dead = True
239             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
240
241     def invalidate_inode(self, inode):
242         llfuse.invalidate_inode(inode)
243
244     def invalidate_entry(self, inode, name):
245         llfuse.invalidate_entry(inode, name)
246
247
248 def catch_exceptions(orig_func):
249     """Catch uncaught exceptions and log them consistently."""
250
251     @functools.wraps(orig_func)
252     def catch_exceptions_wrapper(self, *args, **kwargs):
253         try:
254             return orig_func(self, *args, **kwargs)
255         except llfuse.FUSEError:
256             raise
257         except EnvironmentError as e:
258             raise llfuse.FUSEError(e.errno)
259         except arvados.errors.KeepWriteError as e:
260             _logger.error("Keep write error: " + str(e))
261             raise llfuse.FUSEError(errno.EIO)
262         except arvados.errors.NotFoundError as e:
263             _logger.error("Block not found error: " + str(e))
264             raise llfuse.FUSEError(errno.EIO)
265         except:
266             _logger.exception("Unhandled exception during FUSE operation")
267             raise llfuse.FUSEError(errno.EIO)
268
269     return catch_exceptions_wrapper
270
271
272 class Operations(llfuse.Operations):
273     """This is the main interface with llfuse.
274
275     The methods on this object are called by llfuse threads to service FUSE
276     events to query and read from the file system.
277
278     llfuse has its own global lock which is acquired before calling a request handler,
279     so request handlers do not run concurrently unless the lock is explicitly released
280     using 'with llfuse.lock_released:'
281
282     """
283
284     def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
285         super(Operations, self).__init__()
286
287         if not inode_cache:
288             inode_cache = InodeCache(cap=256*1024*1024)
289         self.inodes = Inodes(inode_cache, encoding=encoding)
290         self.uid = uid
291         self.gid = gid
292         self.enable_write = enable_write
293
294         # dict of inode to filehandle
295         self._filehandles = {}
296         self._filehandles_counter = itertools.count(0)
297
298         # Other threads that need to wait until the fuse driver
299         # is fully initialized should wait() on this event object.
300         self.initlock = threading.Event()
301
302         self.num_retries = num_retries
303
304         self.events = None
305
306     def init(self):
307         # Allow threads that are waiting for the driver to be finished
308         # initializing to continue
309         self.initlock.set()
310
311     @catch_exceptions
312     def destroy(self):
313         if self.events:
314             self.events.close()
315             self.events = None
316
317         for k,v in self.inodes.items():
318             try:
319                 v.finalize()
320             except Exception as e:
321                 _logger.exception("Error during finalize of inode %i", k)
322         self.inodes = None
323
324     def access(self, inode, mode, ctx):
325         return True
326
327     def listen_for_events(self, api_client):
328         self.events = arvados.events.subscribe(api_client,
329                                  [["event_type", "in", ["create", "update", "delete"]]],
330                                  self.on_event)
331
332     @catch_exceptions
333     def on_event(self, ev):
334         if 'event_type' in ev:
335             with llfuse.lock:
336                 item = self.inodes.inode_cache.find(ev["object_uuid"])
337                 if item is not None:
338                     item.invalidate()
339                     if ev["object_kind"] == "arvados#collection":
340                         new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
341
342                         # new_attributes.modified_at currently lacks subsecond precision (see #6347) so use event_at which
343                         # should always be the same.
344                         #record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
345                         record_version = (ev["event_at"], new_attr["portable_data_hash"]) if new_attr else None
346
347                         item.update(to_record_version=record_version)
348                     else:
349                         item.update()
350
351                 oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
352                 olditemparent = self.inodes.inode_cache.find(oldowner)
353                 if olditemparent is not None:
354                     olditemparent.invalidate()
355                     olditemparent.update()
356
357                 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
358                 if itemparent is not None:
359                     itemparent.invalidate()
360                     itemparent.update()
361
362
363     @catch_exceptions
364     def getattr(self, inode):
365         if inode not in self.inodes:
366             raise llfuse.FUSEError(errno.ENOENT)
367
368         e = self.inodes[inode]
369
370         entry = llfuse.EntryAttributes()
371         entry.st_ino = inode
372         entry.generation = 0
373         entry.entry_timeout = 60
374         entry.attr_timeout = 60
375
376         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
377         if isinstance(e, Directory):
378             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
379         else:
380             entry.st_mode |= stat.S_IFREG
381             if isinstance(e, FuseArvadosFile):
382                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
383
384         if self.enable_write and e.writable():
385             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
386
387         entry.st_nlink = 1
388         entry.st_uid = self.uid
389         entry.st_gid = self.gid
390         entry.st_rdev = 0
391
392         entry.st_size = e.size()
393
394         entry.st_blksize = 512
395         entry.st_blocks = (entry.st_size/512)+1
396         entry.st_atime = int(e.atime())
397         entry.st_mtime = int(e.mtime())
398         entry.st_ctime = int(e.mtime())
399
400         return entry
401
402     @catch_exceptions
403     def setattr(self, inode, attr):
404         entry = self.getattr(inode)
405
406         e = self.inodes[inode]
407
408         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
409             with llfuse.lock_released:
410                 e.arvfile.truncate(attr.st_size)
411                 entry.st_size = e.arvfile.size()
412
413         return entry
414
415     @catch_exceptions
416     def lookup(self, parent_inode, name):
417         name = unicode(name, self.inodes.encoding)
418         inode = None
419
420         if name == '.':
421             inode = parent_inode
422         else:
423             if parent_inode in self.inodes:
424                 p = self.inodes[parent_inode]
425                 if name == '..':
426                     inode = p.parent_inode
427                 elif isinstance(p, Directory) and name in p:
428                     inode = p[name].inode
429
430         if inode != None:
431             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
432                       parent_inode, name, inode)
433             self.inodes[inode].inc_ref()
434             return self.getattr(inode)
435         else:
436             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
437                       parent_inode, name)
438             raise llfuse.FUSEError(errno.ENOENT)
439
440     @catch_exceptions
441     def forget(self, inodes):
442         for inode, nlookup in inodes:
443             ent = self.inodes[inode]
444             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
445             if ent.dec_ref(nlookup) == 0 and ent.dead:
446                 self.inodes.del_entry(ent)
447
448     @catch_exceptions
449     def open(self, inode, flags):
450         if inode in self.inodes:
451             p = self.inodes[inode]
452         else:
453             raise llfuse.FUSEError(errno.ENOENT)
454
455         if isinstance(p, Directory):
456             raise llfuse.FUSEError(errno.EISDIR)
457
458         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
459             raise llfuse.FUSEError(errno.EPERM)
460
461         fh = next(self._filehandles_counter)
462         self._filehandles[fh] = FileHandle(fh, p)
463         self.inodes.touch(p)
464         return fh
465
466     @catch_exceptions
467     def read(self, fh, off, size):
468         _logger.debug("arv-mount read %i %i %i", fh, off, size)
469         if fh in self._filehandles:
470             handle = self._filehandles[fh]
471         else:
472             raise llfuse.FUSEError(errno.EBADF)
473
474         self.inodes.touch(handle.obj)
475
476         return handle.obj.readfrom(off, size, self.num_retries)
477
478     @catch_exceptions
479     def write(self, fh, off, buf):
480         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
481         if fh in self._filehandles:
482             handle = self._filehandles[fh]
483         else:
484             raise llfuse.FUSEError(errno.EBADF)
485
486         if not handle.obj.writable():
487             raise llfuse.FUSEError(errno.EPERM)
488
489         self.inodes.touch(handle.obj)
490
491         return handle.obj.writeto(off, buf, self.num_retries)
492
493     @catch_exceptions
494     def release(self, fh):
495         if fh in self._filehandles:
496             try:
497                 self._filehandles[fh].flush()
498             except Exception:
499                 raise
500             finally:
501                 self._filehandles[fh].release()
502                 del self._filehandles[fh]
503         self.inodes.inode_cache.cap_cache()
504
505     def releasedir(self, fh):
506         self.release(fh)
507
508     @catch_exceptions
509     def opendir(self, inode):
510         _logger.debug("arv-mount opendir: inode %i", inode)
511
512         if inode in self.inodes:
513             p = self.inodes[inode]
514         else:
515             raise llfuse.FUSEError(errno.ENOENT)
516
517         if not isinstance(p, Directory):
518             raise llfuse.FUSEError(errno.ENOTDIR)
519
520         fh = next(self._filehandles_counter)
521         if p.parent_inode in self.inodes:
522             parent = self.inodes[p.parent_inode]
523         else:
524             raise llfuse.FUSEError(errno.EIO)
525
526         # update atime
527         self.inodes.touch(p)
528
529         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
530         return fh
531
532     @catch_exceptions
533     def readdir(self, fh, off):
534         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
535
536         if fh in self._filehandles:
537             handle = self._filehandles[fh]
538         else:
539             raise llfuse.FUSEError(errno.EBADF)
540
541         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
542
543         e = off
544         while e < len(handle.entries):
545             if handle.entries[e][1].inode in self.inodes:
546                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
547             e += 1
548
549     @catch_exceptions
550     def statfs(self):
551         st = llfuse.StatvfsData()
552         st.f_bsize = 128 * 1024
553         st.f_blocks = 0
554         st.f_files = 0
555
556         st.f_bfree = 0
557         st.f_bavail = 0
558
559         st.f_ffree = 0
560         st.f_favail = 0
561
562         st.f_frsize = 0
563         return st
564
565     def _check_writable(self, inode_parent):
566         if not self.enable_write:
567             raise llfuse.FUSEError(errno.EROFS)
568
569         if inode_parent in self.inodes:
570             p = self.inodes[inode_parent]
571         else:
572             raise llfuse.FUSEError(errno.ENOENT)
573
574         if not isinstance(p, Directory):
575             raise llfuse.FUSEError(errno.ENOTDIR)
576
577         if not p.writable():
578             raise llfuse.FUSEError(errno.EPERM)
579
580         return p
581
582     @catch_exceptions
583     def create(self, inode_parent, name, mode, flags, ctx):
584         _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
585
586         p = self._check_writable(inode_parent)
587         p.create(name)
588
589         # The file entry should have been implicitly created by callback.
590         f = p[name]
591         fh = next(self._filehandles_counter)
592         self._filehandles[fh] = FileHandle(fh, f)
593         self.inodes.touch(p)
594
595         f.inc_ref()
596         return (fh, self.getattr(f.inode))
597
598     @catch_exceptions
599     def mkdir(self, inode_parent, name, mode, ctx):
600         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
601
602         p = self._check_writable(inode_parent)
603         p.mkdir(name)
604
605         # The dir entry should have been implicitly created by callback.
606         d = p[name]
607
608         d.inc_ref()
609         return self.getattr(d.inode)
610
611     @catch_exceptions
612     def unlink(self, inode_parent, name):
613         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
614         p = self._check_writable(inode_parent)
615         p.unlink(name)
616
617     @catch_exceptions
618     def rmdir(self, inode_parent, name):
619         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
620         p = self._check_writable(inode_parent)
621         p.rmdir(name)
622
623     @catch_exceptions
624     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
625         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
626         src = self._check_writable(inode_parent_old)
627         dest = self._check_writable(inode_parent_new)
628         dest.rename(name_old, name_new, src)
629
630     @catch_exceptions
631     def flush(self, fh):
632         if fh in self._filehandles:
633             self._filehandles[fh].flush()
634
635     def fsync(self, fh, datasync):
636         self.flush(fh)
637
638     def fsyncdir(self, fh, datasync):
639         self.flush(fh)