Merge branch '10722-cwl-subworkflow' closes #10722
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69 import arvados.keep
70
71 import Queue
72
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
75 # details.
76
77 llfuse._notify_queue = Queue.Queue()
78
79 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
80 from fusefile import StringFile, FuseArvadosFile
81
82 _logger = logging.getLogger('arvados.arvados_fuse')
83
84 # Uncomment this to enable llfuse debug logging.
85 # log_handler = logging.StreamHandler()
86 # llogger = logging.getLogger('llfuse')
87 # llogger.addHandler(log_handler)
88 # llogger.setLevel(logging.DEBUG)
89
90 class Handle(object):
91     """Connects a numeric file handle to a File or Directory object that has
92     been opened by the client."""
93
94     def __init__(self, fh, obj):
95         self.fh = fh
96         self.obj = obj
97         self.obj.inc_use()
98
99     def release(self):
100         self.obj.dec_use()
101
102     def flush(self):
103         if self.obj.writable():
104             return self.obj.flush()
105
106
107 class FileHandle(Handle):
108     """Connects a numeric file handle to a File  object that has
109     been opened by the client."""
110     pass
111
112
113 class DirectoryHandle(Handle):
114     """Connects a numeric file handle to a Directory object that has
115     been opened by the client."""
116
117     def __init__(self, fh, dirobj, entries):
118         super(DirectoryHandle, self).__init__(fh, dirobj)
119         self.entries = entries
120
121
122 class InodeCache(object):
123     """Records the memory footprint of objects and when they are last used.
124
125     When the cache limit is exceeded, the least recently used objects are
126     cleared.  Clearing the object means discarding its contents to release
127     memory.  The next time the object is accessed, it must be re-fetched from
128     the server.  Note that the inode cache limit is a soft limit; the cache
129     limit may be exceeded if necessary to load very large objects, it may also
130     be exceeded if open file handles prevent objects from being cleared.
131
132     """
133
134     def __init__(self, cap, min_entries=4):
135         self._entries = collections.OrderedDict()
136         self._by_uuid = {}
137         self.cap = cap
138         self._total = 0
139         self.min_entries = min_entries
140
141     def total(self):
142         return self._total
143
144     def _remove(self, obj, clear):
145         if clear:
146             if obj.in_use():
147                 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
148                 return
149             if obj.has_ref(True):
150                 obj.kernel_invalidate()
151                 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
152                 return
153             obj.clear()
154
155         # The llfuse lock is released in del_entry(), which is called by
156         # Directory.clear().  While the llfuse lock is released, it can happen
157         # that a reentrant call removes this entry before this call gets to it.
158         # Ensure that the entry is still valid before trying to remove it.
159         if obj.inode not in self._entries:
160             return
161
162         self._total -= obj.cache_size
163         del self._entries[obj.inode]
164         if obj.cache_uuid:
165             self._by_uuid[obj.cache_uuid].remove(obj)
166             if not self._by_uuid[obj.cache_uuid]:
167                 del self._by_uuid[obj.cache_uuid]
168             obj.cache_uuid = None
169         if clear:
170             _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
171
172     def cap_cache(self):
173         if self._total > self.cap:
174             for ent in self._entries.values():
175                 if self._total < self.cap or len(self._entries) < self.min_entries:
176                     break
177                 self._remove(ent, True)
178
179     def manage(self, obj):
180         if obj.persisted():
181             obj.cache_size = obj.objsize()
182             self._entries[obj.inode] = obj
183             obj.cache_uuid = obj.uuid()
184             if obj.cache_uuid:
185                 if obj.cache_uuid not in self._by_uuid:
186                     self._by_uuid[obj.cache_uuid] = [obj]
187                 else:
188                     if obj not in self._by_uuid[obj.cache_uuid]:
189                         self._by_uuid[obj.cache_uuid].append(obj)
190             self._total += obj.objsize()
191             _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
192             self.cap_cache()
193
194     def touch(self, obj):
195         if obj.persisted():
196             if obj.inode in self._entries:
197                 self._remove(obj, False)
198             self.manage(obj)
199
200     def unmanage(self, obj):
201         if obj.persisted() and obj.inode in self._entries:
202             self._remove(obj, True)
203
204     def find_by_uuid(self, uuid):
205         return self._by_uuid.get(uuid, [])
206
207     def clear(self):
208         self._entries.clear()
209         self._by_uuid.clear()
210         self._total = 0
211
212 class Inodes(object):
213     """Manage the set of inodes.  This is the mapping from a numeric id
214     to a concrete File or Directory object"""
215
216     def __init__(self, inode_cache, encoding="utf-8"):
217         self._entries = {}
218         self._counter = itertools.count(llfuse.ROOT_INODE)
219         self.inode_cache = inode_cache
220         self.encoding = encoding
221         self.deferred_invalidations = []
222
223     def __getitem__(self, item):
224         return self._entries[item]
225
226     def __setitem__(self, key, item):
227         self._entries[key] = item
228
229     def __iter__(self):
230         return self._entries.iterkeys()
231
232     def items(self):
233         return self._entries.items()
234
235     def __contains__(self, k):
236         return k in self._entries
237
238     def touch(self, entry):
239         entry._atime = time.time()
240         self.inode_cache.touch(entry)
241
242     def add_entry(self, entry):
243         entry.inode = next(self._counter)
244         if entry.inode == llfuse.ROOT_INODE:
245             entry.inc_ref()
246         self._entries[entry.inode] = entry
247         self.inode_cache.manage(entry)
248         return entry
249
250     def del_entry(self, entry):
251         if entry.ref_count == 0:
252             self.inode_cache.unmanage(entry)
253             del self._entries[entry.inode]
254             with llfuse.lock_released:
255                 entry.finalize()
256             self.invalidate_inode(entry.inode)
257             entry.inode = None
258         else:
259             entry.dead = True
260             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
261
262     def invalidate_inode(self, inode):
263         llfuse.invalidate_inode(inode)
264
265     def invalidate_entry(self, inode, name):
266         llfuse.invalidate_entry(inode, name.encode(self.encoding))
267
268     def clear(self):
269         self.inode_cache.clear()
270
271         for k,v in self._entries.items():
272             try:
273                 v.finalize()
274             except Exception as e:
275                 _logger.exception("Error during finalize of inode %i", k)
276
277         self._entries.clear()
278
279
280 def catch_exceptions(orig_func):
281     """Catch uncaught exceptions and log them consistently."""
282
283     @functools.wraps(orig_func)
284     def catch_exceptions_wrapper(self, *args, **kwargs):
285         try:
286             return orig_func(self, *args, **kwargs)
287         except llfuse.FUSEError:
288             raise
289         except EnvironmentError as e:
290             raise llfuse.FUSEError(e.errno)
291         except arvados.errors.KeepWriteError as e:
292             _logger.error("Keep write error: " + str(e))
293             raise llfuse.FUSEError(errno.EIO)
294         except arvados.errors.NotFoundError as e:
295             _logger.error("Block not found error: " + str(e))
296             raise llfuse.FUSEError(errno.EIO)
297         except:
298             _logger.exception("Unhandled exception during FUSE operation")
299             raise llfuse.FUSEError(errno.EIO)
300
301     return catch_exceptions_wrapper
302
303
304 class Operations(llfuse.Operations):
305     """This is the main interface with llfuse.
306
307     The methods on this object are called by llfuse threads to service FUSE
308     events to query and read from the file system.
309
310     llfuse has its own global lock which is acquired before calling a request handler,
311     so request handlers do not run concurrently unless the lock is explicitly released
312     using 'with llfuse.lock_released:'
313
314     """
315
316     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
317         super(Operations, self).__init__()
318
319         self._api_client = api_client
320
321         if not inode_cache:
322             inode_cache = InodeCache(cap=256*1024*1024)
323         self.inodes = Inodes(inode_cache, encoding=encoding)
324         self.uid = uid
325         self.gid = gid
326         self.enable_write = enable_write
327
328         # dict of inode to filehandle
329         self._filehandles = {}
330         self._filehandles_counter = itertools.count(0)
331
332         # Other threads that need to wait until the fuse driver
333         # is fully initialized should wait() on this event object.
334         self.initlock = threading.Event()
335
336         # If we get overlapping shutdown events (e.g., fusermount -u
337         # -z and operations.destroy()) llfuse calls forget() on inodes
338         # that have already been deleted. To avoid this, we make
339         # forget() a no-op if called after destroy().
340         self._shutdown_started = threading.Event()
341
342         self.num_retries = num_retries
343
344         self.read_counter = arvados.keep.Counter()
345         self.write_counter = arvados.keep.Counter()
346         self.read_ops_counter = arvados.keep.Counter()
347         self.write_ops_counter = arvados.keep.Counter()
348
349         self.events = None
350
351     def init(self):
352         # Allow threads that are waiting for the driver to be finished
353         # initializing to continue
354         self.initlock.set()
355
356     @catch_exceptions
357     def destroy(self):
358         self._shutdown_started.set()
359         if self.events:
360             self.events.close()
361             self.events = None
362
363         self.inodes.clear()
364
365     def access(self, inode, mode, ctx):
366         return True
367
368     def listen_for_events(self):
369         self.events = arvados.events.subscribe(
370             self._api_client,
371             [["event_type", "in", ["create", "update", "delete"]]],
372             self.on_event)
373
374     @catch_exceptions
375     def on_event(self, ev):
376         if 'event_type' not in ev:
377             return
378         with llfuse.lock:
379             new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
380             pdh = new_attrs.get("portable_data_hash")
381             # new_attributes.modified_at currently lacks
382             # subsecond precision (see #6347) so use event_at
383             # which should always be the same.
384             stamp = ev.get("event_at")
385
386             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
387                 item.invalidate()
388                 if stamp and pdh and ev.get("object_kind") == "arvados#collection":
389                     item.update(to_record_version=(stamp, pdh))
390                 else:
391                     item.update()
392
393             oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
394             newowner = ev.get("object_owner_uuid")
395             for parent in (
396                     self.inodes.inode_cache.find_by_uuid(oldowner) +
397                     self.inodes.inode_cache.find_by_uuid(newowner)):
398                 parent.invalidate()
399                 parent.update()
400
401     @catch_exceptions
402     def getattr(self, inode, ctx=None):
403         if inode not in self.inodes:
404             raise llfuse.FUSEError(errno.ENOENT)
405
406         e = self.inodes[inode]
407
408         entry = llfuse.EntryAttributes()
409         entry.st_ino = inode
410         entry.generation = 0
411         entry.entry_timeout = 60 if e.allow_dirent_cache else 0
412         entry.attr_timeout = 60 if e.allow_attr_cache else 0
413
414         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
415         if isinstance(e, Directory):
416             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
417         else:
418             entry.st_mode |= stat.S_IFREG
419             if isinstance(e, FuseArvadosFile):
420                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
421
422         if self.enable_write and e.writable():
423             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
424
425         entry.st_nlink = 1
426         entry.st_uid = self.uid
427         entry.st_gid = self.gid
428         entry.st_rdev = 0
429
430         entry.st_size = e.size()
431
432         entry.st_blksize = 512
433         entry.st_blocks = (entry.st_size/512)+1
434         entry.st_atime_ns = int(e.atime() * 1000000000)
435         entry.st_mtime_ns = int(e.mtime() * 1000000000)
436         entry.st_ctime_ns = int(e.mtime() * 1000000000)
437
438         return entry
439
440     @catch_exceptions
441     def setattr(self, inode, attr, fields, fh, ctx):
442         entry = self.getattr(inode)
443
444         if fh is not None and fh in self._filehandles:
445             handle = self._filehandles[fh]
446             e = handle.obj
447         else:
448             e = self.inodes[inode]
449
450         if fields.update_size and isinstance(e, FuseArvadosFile):
451             with llfuse.lock_released:
452                 e.arvfile.truncate(attr.st_size)
453                 entry.st_size = e.arvfile.size()
454
455         return entry
456
457     @catch_exceptions
458     def lookup(self, parent_inode, name, ctx):
459         name = unicode(name, self.inodes.encoding)
460         inode = None
461
462         if name == '.':
463             inode = parent_inode
464         else:
465             if parent_inode in self.inodes:
466                 p = self.inodes[parent_inode]
467                 self.inodes.touch(p)
468                 if name == '..':
469                     inode = p.parent_inode
470                 elif isinstance(p, Directory) and name in p:
471                     inode = p[name].inode
472
473         if inode != None:
474             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
475                       parent_inode, name, inode)
476             self.inodes[inode].inc_ref()
477             return self.getattr(inode)
478         else:
479             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
480                       parent_inode, name)
481             raise llfuse.FUSEError(errno.ENOENT)
482
483     @catch_exceptions
484     def forget(self, inodes):
485         if self._shutdown_started.is_set():
486             return
487         for inode, nlookup in inodes:
488             ent = self.inodes[inode]
489             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
490             if ent.dec_ref(nlookup) == 0 and ent.dead:
491                 self.inodes.del_entry(ent)
492
493     @catch_exceptions
494     def open(self, inode, flags, ctx):
495         if inode in self.inodes:
496             p = self.inodes[inode]
497         else:
498             raise llfuse.FUSEError(errno.ENOENT)
499
500         if isinstance(p, Directory):
501             raise llfuse.FUSEError(errno.EISDIR)
502
503         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
504             raise llfuse.FUSEError(errno.EPERM)
505
506         fh = next(self._filehandles_counter)
507         self._filehandles[fh] = FileHandle(fh, p)
508         self.inodes.touch(p)
509
510         # Normally, we will have received an "update" event if the
511         # parent collection is stale here. However, even if the parent
512         # collection hasn't changed, the manifest might have been
513         # fetched so long ago that the signatures on the data block
514         # locators have expired. Calling checkupdate() on all
515         # ancestors ensures the signatures will be refreshed if
516         # necessary.
517         while p.parent_inode in self.inodes:
518             if p == self.inodes[p.parent_inode]:
519                 break
520             p = self.inodes[p.parent_inode]
521             self.inodes.touch(p)
522             p.checkupdate()
523
524         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
525
526         return fh
527
528     @catch_exceptions
529     def read(self, fh, off, size):
530         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
531         self.read_ops_counter.add(1)
532
533         if fh in self._filehandles:
534             handle = self._filehandles[fh]
535         else:
536             raise llfuse.FUSEError(errno.EBADF)
537
538         self.inodes.touch(handle.obj)
539
540         r = handle.obj.readfrom(off, size, self.num_retries)
541         if r:
542             self.read_counter.add(len(r))
543         return r
544
545     @catch_exceptions
546     def write(self, fh, off, buf):
547         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
548         self.write_ops_counter.add(1)
549
550         if fh in self._filehandles:
551             handle = self._filehandles[fh]
552         else:
553             raise llfuse.FUSEError(errno.EBADF)
554
555         if not handle.obj.writable():
556             raise llfuse.FUSEError(errno.EPERM)
557
558         self.inodes.touch(handle.obj)
559
560         w = handle.obj.writeto(off, buf, self.num_retries)
561         if w:
562             self.write_counter.add(w)
563         return w
564
565     @catch_exceptions
566     def release(self, fh):
567         if fh in self._filehandles:
568             try:
569                 self._filehandles[fh].flush()
570             except Exception:
571                 raise
572             finally:
573                 self._filehandles[fh].release()
574                 del self._filehandles[fh]
575         self.inodes.inode_cache.cap_cache()
576
577     def releasedir(self, fh):
578         self.release(fh)
579
580     @catch_exceptions
581     def opendir(self, inode, ctx):
582         _logger.debug("arv-mount opendir: inode %i", inode)
583
584         if inode in self.inodes:
585             p = self.inodes[inode]
586         else:
587             raise llfuse.FUSEError(errno.ENOENT)
588
589         if not isinstance(p, Directory):
590             raise llfuse.FUSEError(errno.ENOTDIR)
591
592         fh = next(self._filehandles_counter)
593         if p.parent_inode in self.inodes:
594             parent = self.inodes[p.parent_inode]
595         else:
596             raise llfuse.FUSEError(errno.EIO)
597
598         # update atime
599         self.inodes.touch(p)
600
601         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
602         return fh
603
604     @catch_exceptions
605     def readdir(self, fh, off):
606         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
607
608         if fh in self._filehandles:
609             handle = self._filehandles[fh]
610         else:
611             raise llfuse.FUSEError(errno.EBADF)
612
613         e = off
614         while e < len(handle.entries):
615             if handle.entries[e][1].inode in self.inodes:
616                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
617             e += 1
618
619     @catch_exceptions
620     def statfs(self, ctx):
621         st = llfuse.StatvfsData()
622         st.f_bsize = 128 * 1024
623         st.f_blocks = 0
624         st.f_files = 0
625
626         st.f_bfree = 0
627         st.f_bavail = 0
628
629         st.f_ffree = 0
630         st.f_favail = 0
631
632         st.f_frsize = 0
633         return st
634
635     def _check_writable(self, inode_parent):
636         if not self.enable_write:
637             raise llfuse.FUSEError(errno.EROFS)
638
639         if inode_parent in self.inodes:
640             p = self.inodes[inode_parent]
641         else:
642             raise llfuse.FUSEError(errno.ENOENT)
643
644         if not isinstance(p, Directory):
645             raise llfuse.FUSEError(errno.ENOTDIR)
646
647         if not p.writable():
648             raise llfuse.FUSEError(errno.EPERM)
649
650         return p
651
652     @catch_exceptions
653     def create(self, inode_parent, name, mode, flags, ctx):
654         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
655
656         p = self._check_writable(inode_parent)
657         p.create(name)
658
659         # The file entry should have been implicitly created by callback.
660         f = p[name]
661         fh = next(self._filehandles_counter)
662         self._filehandles[fh] = FileHandle(fh, f)
663         self.inodes.touch(p)
664
665         f.inc_ref()
666         return (fh, self.getattr(f.inode))
667
668     @catch_exceptions
669     def mkdir(self, inode_parent, name, mode, ctx):
670         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
671
672         p = self._check_writable(inode_parent)
673         p.mkdir(name)
674
675         # The dir entry should have been implicitly created by callback.
676         d = p[name]
677
678         d.inc_ref()
679         return self.getattr(d.inode)
680
681     @catch_exceptions
682     def unlink(self, inode_parent, name, ctx):
683         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
684         p = self._check_writable(inode_parent)
685         p.unlink(name)
686
687     @catch_exceptions
688     def rmdir(self, inode_parent, name, ctx):
689         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
690         p = self._check_writable(inode_parent)
691         p.rmdir(name)
692
693     @catch_exceptions
694     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx):
695         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
696         src = self._check_writable(inode_parent_old)
697         dest = self._check_writable(inode_parent_new)
698         dest.rename(name_old, name_new, src)
699
700     @catch_exceptions
701     def flush(self, fh):
702         if fh in self._filehandles:
703             self._filehandles[fh].flush()
704
705     def fsync(self, fh, datasync):
706         self.flush(fh)
707
708     def fsyncdir(self, fh, datasync):
709         self.flush(fh)