10516: set finished_at to updated_at on pipeline_instances if the pipeline is finishe...
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 """FUSE driver for Arvados Keep
2
3 Architecture:
4
5 There is one `Operations` object per mount point.  It is the entry point for all
6 read and write requests from the llfuse module.
7
8 The operations object owns an `Inodes` object.  The inodes object stores the
9 mapping from numeric inode (used throughout the file system API to uniquely
10 identify files) to the Python objects that implement files and directories.
11
12 The `Inodes` object owns an `InodeCache` object.  The inode cache records the
13 memory footprint of file system objects and when they are last used.  When the
14 cache limit is exceeded, the least recently used objects are cleared.
15
16 File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
17
18 File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
19 which implement actual reads and writes.
20
21 Directory objects inherit from `fusedir.Directory`.  The directory object wraps
22 a Python dict which stores the mapping from filenames to directory entries.
23 Directory contents can be accessed through the Python operators such as `[]`
24 and `in`.  These methods automatically check if the directory is fresh (up to
25 date) or stale (needs update) and will call `update` if necessary before
26 returing a result.
27
28 The general FUSE operation flow is as follows:
29
30 - The request handler is called with either an inode or file handle that is the
31   subject of the operation.
32
33 - Look up the inode using the Inodes table or the file handle in the
34   filehandles table to get the file system object.
35
36 - For methods that alter files or directories, check that the operation is
37   valid and permitted using _check_writable().
38
39 - Call the relevant method on the file system object.
40
41 - Return the result.
42
43 The FUSE driver supports the Arvados event bus.  When an event is received for
44 an object that is live in the inode cache, that object is immediately updated.
45
46 """
47
48 import os
49 import sys
50 import llfuse
51 import errno
52 import stat
53 import threading
54 import arvados
55 import pprint
56 import arvados.events
57 import re
58 import apiclient
59 import json
60 import logging
61 import time
62 import _strptime
63 import calendar
64 import threading
65 import itertools
66 import ciso8601
67 import collections
68 import functools
69 import arvados.keep
70
71 import Queue
72
73 # Default _notify_queue has a limit of 1000 items, but it really needs to be
74 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
75 # details.
76
77 if hasattr(llfuse, 'capi'):
78     # llfuse < 0.42
79     llfuse.capi._notify_queue = Queue.Queue()
80 else:
81     # llfuse >= 0.42
82     llfuse._notify_queue = Queue.Queue()
83
84 from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
85 from fusefile import StringFile, FuseArvadosFile
86
87 _logger = logging.getLogger('arvados.arvados_fuse')
88
89 # Uncomment this to enable llfuse debug logging.
90 # log_handler = logging.StreamHandler()
91 # llogger = logging.getLogger('llfuse')
92 # llogger.addHandler(log_handler)
93 # llogger.setLevel(logging.DEBUG)
94
95 class Handle(object):
96     """Connects a numeric file handle to a File or Directory object that has
97     been opened by the client."""
98
99     def __init__(self, fh, obj):
100         self.fh = fh
101         self.obj = obj
102         self.obj.inc_use()
103
104     def release(self):
105         self.obj.dec_use()
106
107     def flush(self):
108         if self.obj.writable():
109             return self.obj.flush()
110
111
112 class FileHandle(Handle):
113     """Connects a numeric file handle to a File  object that has
114     been opened by the client."""
115     pass
116
117
118 class DirectoryHandle(Handle):
119     """Connects a numeric file handle to a Directory object that has
120     been opened by the client."""
121
122     def __init__(self, fh, dirobj, entries):
123         super(DirectoryHandle, self).__init__(fh, dirobj)
124         self.entries = entries
125
126
127 class InodeCache(object):
128     """Records the memory footprint of objects and when they are last used.
129
130     When the cache limit is exceeded, the least recently used objects are
131     cleared.  Clearing the object means discarding its contents to release
132     memory.  The next time the object is accessed, it must be re-fetched from
133     the server.  Note that the inode cache limit is a soft limit; the cache
134     limit may be exceeded if necessary to load very large objects, it may also
135     be exceeded if open file handles prevent objects from being cleared.
136
137     """
138
139     def __init__(self, cap, min_entries=4):
140         self._entries = collections.OrderedDict()
141         self._by_uuid = {}
142         self.cap = cap
143         self._total = 0
144         self.min_entries = min_entries
145
146     def total(self):
147         return self._total
148
149     def _remove(self, obj, clear):
150         if clear:
151             if obj.in_use():
152                 _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
153                 return
154             if obj.has_ref(True):
155                 obj.kernel_invalidate()
156                 _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
157                 return
158             obj.clear()
159
160         # The llfuse lock is released in del_entry(), which is called by
161         # Directory.clear().  While the llfuse lock is released, it can happen
162         # that a reentrant call removes this entry before this call gets to it.
163         # Ensure that the entry is still valid before trying to remove it.
164         if obj.inode not in self._entries:
165             return
166
167         self._total -= obj.cache_size
168         del self._entries[obj.inode]
169         if obj.cache_uuid:
170             self._by_uuid[obj.cache_uuid].remove(obj)
171             if not self._by_uuid[obj.cache_uuid]:
172                 del self._by_uuid[obj.cache_uuid]
173             obj.cache_uuid = None
174         if clear:
175             _logger.debug("InodeCache cleared inode %i total now %i", obj.inode, self._total)
176
177     def cap_cache(self):
178         if self._total > self.cap:
179             for ent in self._entries.values():
180                 if self._total < self.cap or len(self._entries) < self.min_entries:
181                     break
182                 self._remove(ent, True)
183
184     def manage(self, obj):
185         if obj.persisted():
186             obj.cache_size = obj.objsize()
187             self._entries[obj.inode] = obj
188             obj.cache_uuid = obj.uuid()
189             if obj.cache_uuid:
190                 if obj.cache_uuid not in self._by_uuid:
191                     self._by_uuid[obj.cache_uuid] = [obj]
192                 else:
193                     if obj not in self._by_uuid[obj.cache_uuid]:
194                         self._by_uuid[obj.cache_uuid].append(obj)
195             self._total += obj.objsize()
196             _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
197             self.cap_cache()
198
199     def touch(self, obj):
200         if obj.persisted():
201             if obj.inode in self._entries:
202                 self._remove(obj, False)
203             self.manage(obj)
204
205     def unmanage(self, obj):
206         if obj.persisted() and obj.inode in self._entries:
207             self._remove(obj, True)
208
209     def find_by_uuid(self, uuid):
210         return self._by_uuid.get(uuid, [])
211
212     def clear(self):
213         self._entries.clear()
214         self._by_uuid.clear()
215         self._total = 0
216
217 class Inodes(object):
218     """Manage the set of inodes.  This is the mapping from a numeric id
219     to a concrete File or Directory object"""
220
221     def __init__(self, inode_cache, encoding="utf-8"):
222         self._entries = {}
223         self._counter = itertools.count(llfuse.ROOT_INODE)
224         self.inode_cache = inode_cache
225         self.encoding = encoding
226         self.deferred_invalidations = []
227
228     def __getitem__(self, item):
229         return self._entries[item]
230
231     def __setitem__(self, key, item):
232         self._entries[key] = item
233
234     def __iter__(self):
235         return self._entries.iterkeys()
236
237     def items(self):
238         return self._entries.items()
239
240     def __contains__(self, k):
241         return k in self._entries
242
243     def touch(self, entry):
244         entry._atime = time.time()
245         self.inode_cache.touch(entry)
246
247     def add_entry(self, entry):
248         entry.inode = next(self._counter)
249         if entry.inode == llfuse.ROOT_INODE:
250             entry.inc_ref()
251         self._entries[entry.inode] = entry
252         self.inode_cache.manage(entry)
253         return entry
254
255     def del_entry(self, entry):
256         if entry.ref_count == 0:
257             self.inode_cache.unmanage(entry)
258             del self._entries[entry.inode]
259             with llfuse.lock_released:
260                 entry.finalize()
261             self.invalidate_inode(entry.inode)
262             entry.inode = None
263         else:
264             entry.dead = True
265             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
266
267     def invalidate_inode(self, inode):
268         llfuse.invalidate_inode(inode)
269
270     def invalidate_entry(self, inode, name):
271         llfuse.invalidate_entry(inode, name.encode(self.encoding))
272
273     def clear(self):
274         self.inode_cache.clear()
275
276         for k,v in self._entries.items():
277             try:
278                 v.finalize()
279             except Exception as e:
280                 _logger.exception("Error during finalize of inode %i", k)
281
282         self._entries.clear()
283
284
285 def catch_exceptions(orig_func):
286     """Catch uncaught exceptions and log them consistently."""
287
288     @functools.wraps(orig_func)
289     def catch_exceptions_wrapper(self, *args, **kwargs):
290         try:
291             return orig_func(self, *args, **kwargs)
292         except llfuse.FUSEError:
293             raise
294         except EnvironmentError as e:
295             raise llfuse.FUSEError(e.errno)
296         except arvados.errors.KeepWriteError as e:
297             _logger.error("Keep write error: " + str(e))
298             raise llfuse.FUSEError(errno.EIO)
299         except arvados.errors.NotFoundError as e:
300             _logger.error("Block not found error: " + str(e))
301             raise llfuse.FUSEError(errno.EIO)
302         except:
303             _logger.exception("Unhandled exception during FUSE operation")
304             raise llfuse.FUSEError(errno.EIO)
305
306     return catch_exceptions_wrapper
307
308
309 class Operations(llfuse.Operations):
310     """This is the main interface with llfuse.
311
312     The methods on this object are called by llfuse threads to service FUSE
313     events to query and read from the file system.
314
315     llfuse has its own global lock which is acquired before calling a request handler,
316     so request handlers do not run concurrently unless the lock is explicitly released
317     using 'with llfuse.lock_released:'
318
319     """
320
321     def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
322         super(Operations, self).__init__()
323
324         self._api_client = api_client
325
326         if not inode_cache:
327             inode_cache = InodeCache(cap=256*1024*1024)
328         self.inodes = Inodes(inode_cache, encoding=encoding)
329         self.uid = uid
330         self.gid = gid
331         self.enable_write = enable_write
332
333         # dict of inode to filehandle
334         self._filehandles = {}
335         self._filehandles_counter = itertools.count(0)
336
337         # Other threads that need to wait until the fuse driver
338         # is fully initialized should wait() on this event object.
339         self.initlock = threading.Event()
340
341         # If we get overlapping shutdown events (e.g., fusermount -u
342         # -z and operations.destroy()) llfuse calls forget() on inodes
343         # that have already been deleted. To avoid this, we make
344         # forget() a no-op if called after destroy().
345         self._shutdown_started = threading.Event()
346
347         self.num_retries = num_retries
348
349         self.read_counter = arvados.keep.Counter()
350         self.write_counter = arvados.keep.Counter()
351         self.read_ops_counter = arvados.keep.Counter()
352         self.write_ops_counter = arvados.keep.Counter()
353
354         self.events = None
355
356     def init(self):
357         # Allow threads that are waiting for the driver to be finished
358         # initializing to continue
359         self.initlock.set()
360
361     @catch_exceptions
362     def destroy(self):
363         self._shutdown_started.set()
364         if self.events:
365             self.events.close()
366             self.events = None
367
368         if llfuse.lock.acquire():
369             # llfuse < 0.42
370             self.inodes.clear()
371             llfuse.lock.release()
372         else:
373             # llfuse >= 0.42
374             self.inodes.clear()
375
376     def access(self, inode, mode, ctx):
377         return True
378
379     def listen_for_events(self):
380         self.events = arvados.events.subscribe(
381             self._api_client,
382             [["event_type", "in", ["create", "update", "delete"]]],
383             self.on_event)
384
385     @catch_exceptions
386     def on_event(self, ev):
387         if 'event_type' not in ev:
388             return
389         with llfuse.lock:
390             new_attrs = (ev.get("properties") or {}).get("new_attributes") or {}
391             pdh = new_attrs.get("portable_data_hash")
392             # new_attributes.modified_at currently lacks
393             # subsecond precision (see #6347) so use event_at
394             # which should always be the same.
395             stamp = ev.get("event_at")
396
397             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
398                 item.invalidate()
399                 if stamp and pdh and ev.get("object_kind") == "arvados#collection":
400                     item.update(to_record_version=(stamp, pdh))
401                 else:
402                     item.update()
403
404             oldowner = ((ev.get("properties") or {}).get("old_attributes") or {}).get("owner_uuid")
405             newowner = ev.get("object_owner_uuid")
406             for parent in (
407                     self.inodes.inode_cache.find_by_uuid(oldowner) +
408                     self.inodes.inode_cache.find_by_uuid(newowner)):
409                 parent.invalidate()
410                 parent.update()
411
412     @catch_exceptions
413     def getattr(self, inode, ctx=None):
414         if inode not in self.inodes:
415             raise llfuse.FUSEError(errno.ENOENT)
416
417         e = self.inodes[inode]
418
419         entry = llfuse.EntryAttributes()
420         entry.st_ino = inode
421         entry.generation = 0
422         entry.entry_timeout = 60 if e.allow_dirent_cache else 0
423         entry.attr_timeout = 60 if e.allow_attr_cache else 0
424
425         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
426         if isinstance(e, Directory):
427             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
428         else:
429             entry.st_mode |= stat.S_IFREG
430             if isinstance(e, FuseArvadosFile):
431                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
432
433         if self.enable_write and e.writable():
434             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
435
436         entry.st_nlink = 1
437         entry.st_uid = self.uid
438         entry.st_gid = self.gid
439         entry.st_rdev = 0
440
441         entry.st_size = e.size()
442
443         entry.st_blksize = 512
444         entry.st_blocks = (entry.st_size/512)+1
445         if hasattr(entry, 'st_atime_ns'):
446             # llfuse >= 0.42
447             entry.st_atime_ns = int(e.atime() * 1000000000)
448             entry.st_mtime_ns = int(e.mtime() * 1000000000)
449             entry.st_ctime_ns = int(e.mtime() * 1000000000)
450         else:
451             # llfuse < 0.42
452             entry.st_atime = int(e.atime)
453             entry.st_mtime = int(e.mtime)
454             entry.st_ctime = int(e.mtime)
455
456         return entry
457
458     @catch_exceptions
459     def setattr(self, inode, attr, fields=None, fh=None, ctx=None):
460         entry = self.getattr(inode)
461
462         if fh is not None and fh in self._filehandles:
463             handle = self._filehandles[fh]
464             e = handle.obj
465         else:
466             e = self.inodes[inode]
467
468         if fields is None:
469             # llfuse < 0.42
470             update_size = attr.st_size is not None
471         else:
472             # llfuse >= 0.42
473             update_size = fields.update_size
474         if update_size and isinstance(e, FuseArvadosFile):
475             with llfuse.lock_released:
476                 e.arvfile.truncate(attr.st_size)
477                 entry.st_size = e.arvfile.size()
478
479         return entry
480
481     @catch_exceptions
482     def lookup(self, parent_inode, name, ctx=None):
483         name = unicode(name, self.inodes.encoding)
484         inode = None
485
486         if name == '.':
487             inode = parent_inode
488         else:
489             if parent_inode in self.inodes:
490                 p = self.inodes[parent_inode]
491                 self.inodes.touch(p)
492                 if name == '..':
493                     inode = p.parent_inode
494                 elif isinstance(p, Directory) and name in p:
495                     inode = p[name].inode
496
497         if inode != None:
498             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
499                       parent_inode, name, inode)
500             self.inodes[inode].inc_ref()
501             return self.getattr(inode)
502         else:
503             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
504                       parent_inode, name)
505             raise llfuse.FUSEError(errno.ENOENT)
506
507     @catch_exceptions
508     def forget(self, inodes):
509         if self._shutdown_started.is_set():
510             return
511         for inode, nlookup in inodes:
512             ent = self.inodes[inode]
513             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
514             if ent.dec_ref(nlookup) == 0 and ent.dead:
515                 self.inodes.del_entry(ent)
516
517     @catch_exceptions
518     def open(self, inode, flags, ctx=None):
519         if inode in self.inodes:
520             p = self.inodes[inode]
521         else:
522             raise llfuse.FUSEError(errno.ENOENT)
523
524         if isinstance(p, Directory):
525             raise llfuse.FUSEError(errno.EISDIR)
526
527         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
528             raise llfuse.FUSEError(errno.EPERM)
529
530         fh = next(self._filehandles_counter)
531         self._filehandles[fh] = FileHandle(fh, p)
532         self.inodes.touch(p)
533
534         # Normally, we will have received an "update" event if the
535         # parent collection is stale here. However, even if the parent
536         # collection hasn't changed, the manifest might have been
537         # fetched so long ago that the signatures on the data block
538         # locators have expired. Calling checkupdate() on all
539         # ancestors ensures the signatures will be refreshed if
540         # necessary.
541         while p.parent_inode in self.inodes:
542             if p == self.inodes[p.parent_inode]:
543                 break
544             p = self.inodes[p.parent_inode]
545             self.inodes.touch(p)
546             p.checkupdate()
547
548         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
549
550         return fh
551
552     @catch_exceptions
553     def read(self, fh, off, size):
554         _logger.debug("arv-mount read fh %i off %i size %i", fh, off, size)
555         self.read_ops_counter.add(1)
556
557         if fh in self._filehandles:
558             handle = self._filehandles[fh]
559         else:
560             raise llfuse.FUSEError(errno.EBADF)
561
562         self.inodes.touch(handle.obj)
563
564         r = handle.obj.readfrom(off, size, self.num_retries)
565         if r:
566             self.read_counter.add(len(r))
567         return r
568
569     @catch_exceptions
570     def write(self, fh, off, buf):
571         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
572         self.write_ops_counter.add(1)
573
574         if fh in self._filehandles:
575             handle = self._filehandles[fh]
576         else:
577             raise llfuse.FUSEError(errno.EBADF)
578
579         if not handle.obj.writable():
580             raise llfuse.FUSEError(errno.EPERM)
581
582         self.inodes.touch(handle.obj)
583
584         w = handle.obj.writeto(off, buf, self.num_retries)
585         if w:
586             self.write_counter.add(w)
587         return w
588
589     @catch_exceptions
590     def release(self, fh):
591         if fh in self._filehandles:
592             try:
593                 self._filehandles[fh].flush()
594             except Exception:
595                 raise
596             finally:
597                 self._filehandles[fh].release()
598                 del self._filehandles[fh]
599         self.inodes.inode_cache.cap_cache()
600
601     def releasedir(self, fh):
602         self.release(fh)
603
604     @catch_exceptions
605     def opendir(self, inode, ctx=None):
606         _logger.debug("arv-mount opendir: inode %i", inode)
607
608         if inode in self.inodes:
609             p = self.inodes[inode]
610         else:
611             raise llfuse.FUSEError(errno.ENOENT)
612
613         if not isinstance(p, Directory):
614             raise llfuse.FUSEError(errno.ENOTDIR)
615
616         fh = next(self._filehandles_counter)
617         if p.parent_inode in self.inodes:
618             parent = self.inodes[p.parent_inode]
619         else:
620             raise llfuse.FUSEError(errno.EIO)
621
622         # update atime
623         self.inodes.touch(p)
624
625         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
626         return fh
627
628     @catch_exceptions
629     def readdir(self, fh, off):
630         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
631
632         if fh in self._filehandles:
633             handle = self._filehandles[fh]
634         else:
635             raise llfuse.FUSEError(errno.EBADF)
636
637         e = off
638         while e < len(handle.entries):
639             if handle.entries[e][1].inode in self.inodes:
640                 yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
641             e += 1
642
643     @catch_exceptions
644     def statfs(self, ctx=None):
645         st = llfuse.StatvfsData()
646         st.f_bsize = 128 * 1024
647         st.f_blocks = 0
648         st.f_files = 0
649
650         st.f_bfree = 0
651         st.f_bavail = 0
652
653         st.f_ffree = 0
654         st.f_favail = 0
655
656         st.f_frsize = 0
657         return st
658
659     def _check_writable(self, inode_parent):
660         if not self.enable_write:
661             raise llfuse.FUSEError(errno.EROFS)
662
663         if inode_parent in self.inodes:
664             p = self.inodes[inode_parent]
665         else:
666             raise llfuse.FUSEError(errno.ENOENT)
667
668         if not isinstance(p, Directory):
669             raise llfuse.FUSEError(errno.ENOTDIR)
670
671         if not p.writable():
672             raise llfuse.FUSEError(errno.EPERM)
673
674         return p
675
676     @catch_exceptions
677     def create(self, inode_parent, name, mode, flags, ctx=None):
678         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
679
680         p = self._check_writable(inode_parent)
681         p.create(name)
682
683         # The file entry should have been implicitly created by callback.
684         f = p[name]
685         fh = next(self._filehandles_counter)
686         self._filehandles[fh] = FileHandle(fh, f)
687         self.inodes.touch(p)
688
689         f.inc_ref()
690         return (fh, self.getattr(f.inode))
691
692     @catch_exceptions
693     def mkdir(self, inode_parent, name, mode, ctx=None):
694         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
695
696         p = self._check_writable(inode_parent)
697         p.mkdir(name)
698
699         # The dir entry should have been implicitly created by callback.
700         d = p[name]
701
702         d.inc_ref()
703         return self.getattr(d.inode)
704
705     @catch_exceptions
706     def unlink(self, inode_parent, name, ctx=None):
707         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
708         p = self._check_writable(inode_parent)
709         p.unlink(name)
710
711     @catch_exceptions
712     def rmdir(self, inode_parent, name, ctx=None):
713         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
714         p = self._check_writable(inode_parent)
715         p.rmdir(name)
716
717     @catch_exceptions
718     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
719         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
720         src = self._check_writable(inode_parent_old)
721         dest = self._check_writable(inode_parent_new)
722         dest.rename(name_old, name_new, src)
723
724     @catch_exceptions
725     def flush(self, fh):
726         if fh in self._filehandles:
727             self._filehandles[fh].flush()
728
729     def fsync(self, fh, datasync):
730         self.flush(fh)
731
732     def fsyncdir(self, fh, datasync):
733         self.flush(fh)