Merge branch 'master' into 3198-writable-fuse
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69
70 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
71 from fusefile import StringFile, FuseArvadosFile
72
73 _logger = logging.getLogger('arvados.arvados_fuse')
74
75 # Uncomment this to enable llfuse debug logging.
76 # log_handler = logging.StreamHandler()
77 # llogger = logging.getLogger('llfuse')
78 # llogger.addHandler(log_handler)
79 # llogger.setLevel(logging.DEBUG)
80
81 class Handle(object):
82     """Connects a numeric file handle to a File or Directory object that has
83     been opened by the client."""
84
85     def __init__(self, fh, obj):
86         self.fh = fh
87         self.obj = obj
88         self.obj.inc_use()
89
90     def release(self):
91         self.obj.dec_use()
92
93     def flush(self):
94         return self.obj.flush()
95
96
97 class FileHandle(Handle):
98     """Connects a numeric file handle to a File  object that has
99     been opened by the client."""
100     pass
101
102
103 class DirectoryHandle(Handle):
104     """Connects a numeric file handle to a Directory object that has
105     been opened by the client."""
106
107     def __init__(self, fh, dirobj, entries):
108         super(DirectoryHandle, self).__init__(fh, dirobj)
109         self.entries = entries
110
111
112 class InodeCache(object):
113     """Records the memory footprint of objects and when they are last used.
114
115     When the cache limit is exceeded, the least recently used objects are
116     cleared.  Clearing the object means discarding its contents to release
117     memory.  The next time the object is accessed, it must be re-fetched from
118     the server.  Note that the inode cache limit is a soft limit; the cache
119     limit may be exceeded if necessary to load very large objects, it may also
120     be exceeded if open file handles prevent objects from being cleared.
121
122     """
123
124     def __init__(self, cap, min_entries=4):
125         self._entries = collections.OrderedDict()
126         self._by_uuid = {}
127         self._counter = itertools.count(0)
128         self.cap = cap
129         self._total = 0
130         self.min_entries = min_entries
131
132     def total(self):
133         return self._total
134
135     def _remove(self, obj, clear):
136         if clear and not obj.clear():
137             _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
138             return False
139         self._total -= obj.cache_size
140         del self._entries[obj.cache_priority]
141         if obj.cache_uuid:
142             del self._by_uuid[obj.cache_uuid]
143             obj.cache_uuid = None
144         if clear:
145             _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
146         return True
147
148     def cap_cache(self):
149         if self._total > self.cap:
150             for key in list(self._entries.keys()):
151                 if self._total < self.cap or len(self._entries) < self.min_entries:
152                     break
153                 self._remove(self._entries[key], True)
154
155     def manage(self, obj):
156         if obj.persisted():
157             obj.cache_priority = next(self._counter)
158             obj.cache_size = obj.objsize()
159             self._entries[obj.cache_priority] = obj
160             obj.cache_uuid = obj.uuid()
161             if obj.cache_uuid:
162                 self._by_uuid[obj.cache_uuid] = obj
163             self._total += obj.objsize()
164             _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
165             self.cap_cache()
166         else:
167             obj.cache_priority = None
168
169     def touch(self, obj):
170         if obj.persisted():
171             if obj.cache_priority in self._entries:
172                 self._remove(obj, False)
173             self.manage(obj)
174
175     def unmanage(self, obj):
176         if obj.persisted() and obj.cache_priority in self._entries:
177             self._remove(obj, True)
178
179     def find(self, uuid):
180         return self._by_uuid.get(uuid)
181
182 class Inodes(object):
183     """Manage the set of inodes.  This is the mapping from a numeric id
184     to a concrete File or Directory object"""
185
186     def __init__(self, inode_cache, encoding="utf-8"):
187         self._entries = {}
188         self._counter = itertools.count(llfuse.ROOT_INODE)
189         self.inode_cache = inode_cache
190         self.encoding = encoding
191
192     def __getitem__(self, item):
193         return self._entries[item]
194
195     def __setitem__(self, key, item):
196         self._entries[key] = item
197
198     def __iter__(self):
199         return self._entries.iterkeys()
200
201     def items(self):
202         return self._entries.items()
203
204     def __contains__(self, k):
205         return k in self._entries
206
207     def touch(self, entry):
208         entry._atime = time.time()
209         self.inode_cache.touch(entry)
210
211     def add_entry(self, entry):
212         entry.inode = next(self._counter)
213         if entry.inode == llfuse.ROOT_INODE:
214             entry.inc_ref()
215         self._entries[entry.inode] = entry
216         self.inode_cache.manage(entry)
217         return entry
218
219     def del_entry(self, entry):
220         if entry.ref_count == 0:
221             _logger.debug("Deleting inode %i", entry.inode)
222             self.inode_cache.unmanage(entry)
223             llfuse.invalidate_inode(entry.inode)
224             entry.finalize()
225             del self._entries[entry.inode]
226             entry.inode = None
227         else:
228             entry.dead = True
229             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
230
231
232 def catch_exceptions(orig_func):
233     """Catch uncaught exceptions and log them consistently."""
234
235     @functools.wraps(orig_func)
236     def catch_exceptions_wrapper(self, *args, **kwargs):
237         try:
238             return orig_func(self, *args, **kwargs)
239         except llfuse.FUSEError:
240             raise
241         except EnvironmentError as e:
242             raise llfuse.FUSEError(e.errno)
243         except:
244             _logger.exception("Unhandled exception during FUSE operation")
245             raise llfuse.FUSEError(errno.EIO)
246
247     return catch_exceptions_wrapper
248
249
250 class Operations(llfuse.Operations):
251     """This is the main interface with llfuse.
252
253     The methods on this object are called by llfuse threads to service FUSE
254     events to query and read from the file system.
255
256     llfuse has its own global lock which is acquired before calling a request handler,
257     so request handlers do not run concurrently unless the lock is explicitly released
258     using 'with llfuse.lock_released:'
259
260     """
261
262     def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
263         super(Operations, self).__init__()
264
265         if not inode_cache:
266             inode_cache = InodeCache(cap=256*1024*1024)
267         self.inodes = Inodes(inode_cache, encoding=encoding)
268         self.uid = uid
269         self.gid = gid
270         self.enable_write = enable_write
271
272         # dict of inode to filehandle
273         self._filehandles = {}
274         self._filehandles_counter = itertools.count(0)
275
276         # Other threads that need to wait until the fuse driver
277         # is fully initialized should wait() on this event object.
278         self.initlock = threading.Event()
279
280         self.num_retries = num_retries
281
282         self.events = None
283
284     def init(self):
285         # Allow threads that are waiting for the driver to be finished
286         # initializing to continue
287         self.initlock.set()
288
289     @catch_exceptions
290     def destroy(self):
291         if self.events:
292             self.events.close()
293             self.events = None
294
295         for k,v in self.inodes.items():
296             v.finalize()
297         self.inodes = None
298
299     def access(self, inode, mode, ctx):
300         return True
301
302     def listen_for_events(self, api_client):
303         self.events = arvados.events.subscribe(api_client,
304                                  [["event_type", "in", ["create", "update", "delete"]]],
305                                  self.on_event)
306
307     @catch_exceptions
308     def on_event(self, ev):
309         if 'event_type' in ev:
310             with llfuse.lock:
311                 item = self.inodes.inode_cache.find(ev["object_uuid"])
312                 if item is not None:
313                     item.invalidate()
314                     if ev["object_kind"] == "arvados#collection":
315                         new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
316                         record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
317                         item.update(to_record_version=record_version)
318                     else:
319                         item.update()
320
321                 oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
322                 olditemparent = self.inodes.inode_cache.find(oldowner)
323                 if olditemparent is not None:
324                     olditemparent.invalidate()
325                     olditemparent.update()
326
327                 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
328                 if itemparent is not None:
329                     itemparent.invalidate()
330                     itemparent.update()
331
332     @catch_exceptions
333     def getattr(self, inode):
334         if inode not in self.inodes:
335             raise llfuse.FUSEError(errno.ENOENT)
336
337         e = self.inodes[inode]
338
339         entry = llfuse.EntryAttributes()
340         entry.st_ino = inode
341         entry.generation = 0
342         entry.entry_timeout = 60
343         entry.attr_timeout = 60
344
345         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
346         if isinstance(e, Directory):
347             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
348         else:
349             entry.st_mode |= stat.S_IFREG
350             if isinstance(e, FuseArvadosFile):
351                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
352
353         if self.enable_write and e.writable():
354             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
355
356         entry.st_nlink = 1
357         entry.st_uid = self.uid
358         entry.st_gid = self.gid
359         entry.st_rdev = 0
360
361         entry.st_size = e.size()
362
363         entry.st_blksize = 512
364         entry.st_blocks = (entry.st_size/512)+1
365         entry.st_atime = int(e.atime())
366         entry.st_mtime = int(e.mtime())
367         entry.st_ctime = int(e.mtime())
368
369         return entry
370
371     @catch_exceptions
372     def setattr(self, inode, attr):
373         entry = self.getattr(inode)
374
375         e = self.inodes[inode]
376
377         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
378             with llfuse.lock_released:
379                 e.arvfile.truncate(attr.st_size)
380                 entry.st_size = e.arvfile.size()
381
382         return entry
383
384     @catch_exceptions
385     def lookup(self, parent_inode, name):
386         name = unicode(name, self.inodes.encoding)
387         inode = None
388
389         if name == '.':
390             inode = parent_inode
391         else:
392             if parent_inode in self.inodes:
393                 p = self.inodes[parent_inode]
394                 if name == '..':
395                     inode = p.parent_inode
396                 elif isinstance(p, Directory) and name in p:
397                     inode = p[name].inode
398
399         if inode != None:
400             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
401                       parent_inode, name, inode)
402             self.inodes[inode].inc_ref()
403             return self.getattr(inode)
404         else:
405             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
406                       parent_inode, name)
407             raise llfuse.FUSEError(errno.ENOENT)
408
409     @catch_exceptions
410     def forget(self, inodes):
411         for inode, nlookup in inodes:
412             ent = self.inodes[inode]
413             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
414             if ent.dec_ref(nlookup) == 0 and ent.dead:
415                 self.inodes.del_entry(ent)
416
417     @catch_exceptions
418     def open(self, inode, flags):
419         if inode in self.inodes:
420             p = self.inodes[inode]
421         else:
422             raise llfuse.FUSEError(errno.ENOENT)
423
424         if isinstance(p, Directory):
425             raise llfuse.FUSEError(errno.EISDIR)
426
427         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
428             raise llfuse.FUSEError(errno.EPERM)
429
430         fh = next(self._filehandles_counter)
431         self._filehandles[fh] = FileHandle(fh, p)
432         self.inodes.touch(p)
433         return fh
434
435     @catch_exceptions
436     def read(self, fh, off, size):
437         _logger.debug("arv-mount read %i %i %i", fh, off, size)
438         if fh in self._filehandles:
439             handle = self._filehandles[fh]
440         else:
441             raise llfuse.FUSEError(errno.EBADF)
442
443         self.inodes.touch(handle.obj)
444
445         try:
446             return handle.obj.readfrom(off, size, self.num_retries)
447         except arvados.errors.NotFoundError as e:
448             _logger.error("Block not found: " + str(e))
449             raise llfuse.FUSEError(errno.EIO)
450
451     @catch_exceptions
452     def write(self, fh, off, buf):
453         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
454         if fh in self._filehandles:
455             handle = self._filehandles[fh]
456         else:
457             raise llfuse.FUSEError(errno.EBADF)
458
459         if not handle.obj.writable():
460             raise llfuse.FUSEError(errno.EPERM)
461
462         self.inodes.touch(handle.obj)
463
464         return handle.obj.writeto(off, buf, self.num_retries)
465
466     @catch_exceptions
467     def release(self, fh):
468         if fh in self._filehandles:
469             try:
470                 self._filehandles[fh].flush()
471             except EnvironmentError as e:
472                 raise llfuse.FUSEError(e.errno)
473             except Exception:
474                 _logger.exception("Flush error")
475             self._filehandles[fh].release()
476             del self._filehandles[fh]
477         self.inodes.inode_cache.cap_cache()
478
479     def releasedir(self, fh):
480         self.release(fh)
481
482     @catch_exceptions
483     def opendir(self, inode):
484         _logger.debug("arv-mount opendir: inode %i", inode)
485
486         if inode in self.inodes:
487             p = self.inodes[inode]
488         else:
489             raise llfuse.FUSEError(errno.ENOENT)
490
491         if not isinstance(p, Directory):
492             raise llfuse.FUSEError(errno.ENOTDIR)
493
494         fh = next(self._filehandles_counter)
495         if p.parent_inode in self.inodes:
496             parent = self.inodes[p.parent_inode]
497         else:
498             raise llfuse.FUSEError(errno.EIO)
499
500         # update atime
501         self.inodes.touch(p)
502
503         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
504         return fh
505
506     @catch_exceptions
507     def readdir(self, fh, off):
508         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
509
510         if fh in self._filehandles:
511             handle = self._filehandles[fh]
512         else:
513             raise llfuse.FUSEError(errno.EBADF)
514
515         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
516
517         e = off
518         while e < len(handle.entries):
519             if handle.entries[e][1].inode in self.inodes:
520                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
521             e += 1
522
523     @catch_exceptions
524     def statfs(self):
525         st = llfuse.StatvfsData()
526         st.f_bsize = 128 * 1024
527         st.f_blocks = 0
528         st.f_files = 0
529
530         st.f_bfree = 0
531         st.f_bavail = 0
532
533         st.f_ffree = 0
534         st.f_favail = 0
535
536         st.f_frsize = 0
537         return st
538
539     def _check_writable(self, inode_parent):
540         if not self.enable_write:
541             raise llfuse.FUSEError(errno.EROFS)
542
543         if inode_parent in self.inodes:
544             p = self.inodes[inode_parent]
545         else:
546             raise llfuse.FUSEError(errno.ENOENT)
547
548         if not isinstance(p, Directory):
549             raise llfuse.FUSEError(errno.ENOTDIR)
550
551         if not p.writable():
552             raise llfuse.FUSEError(errno.EPERM)
553
554         return p
555
556     @catch_exceptions
557     def create(self, inode_parent, name, mode, flags, ctx):
558         p = self._check_writable(inode_parent)
559         p.create(name)
560
561         # The file entry should have been implicitly created by callback.
562         f = p[name]
563         fh = next(self._filehandles_counter)
564         self._filehandles[fh] = FileHandle(fh, f)
565         self.inodes.touch(p)
566
567         f.inc_ref()
568         return (fh, self.getattr(f.inode))
569
570     @catch_exceptions
571     def mkdir(self, inode_parent, name, mode, ctx):
572         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
573
574         p = self._check_writable(inode_parent)
575         p.mkdir(name)
576
577         # The dir entry should have been implicitly created by callback.
578         d = p[name]
579
580         d.inc_ref()
581         return self.getattr(d.inode)
582
583     @catch_exceptions
584     def unlink(self, inode_parent, name):
585         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
586         p = self._check_writable(inode_parent)
587         p.unlink(name)
588
589     @catch_exceptions
590     def rmdir(self, inode_parent, name):
591         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
592         p = self._check_writable(inode_parent)
593         p.rmdir(name)
594
595     @catch_exceptions
596     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
597         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
598         src = self._check_writable(inode_parent_old)
599         dest = self._check_writable(inode_parent_new)
600         dest.rename(name_old, name_new, src)
601
602     @catch_exceptions
603     def flush(self, fh):
604         if fh in self._filehandles:
605             self._filehandles[fh].flush()
606
607     def fsync(self, fh, datasync):
608         self.flush(fh)
609
610     def fsyncdir(self, fh, datasync):
611         self.flush(fh)