Merge branch 'master' into 3198-writable-fuse
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23 import ciso8601
24 import collections
25 import functools
26
27 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
28 from fusefile import StringFile, FuseArvadosFile
29
30 _logger = logging.getLogger('arvados.arvados_fuse')
31
32 log_handler = logging.StreamHandler()
33 llogger = logging.getLogger('llfuse')
34 llogger.addHandler(log_handler)
35 llogger.setLevel(logging.DEBUG)
36
37 class Handle(object):
38     """Connects a numeric file handle to a File or Directory object that has
39     been opened by the client."""
40
41     def __init__(self, fh, obj):
42         self.fh = fh
43         self.obj = obj
44         self.obj.inc_use()
45
46     def release(self):
47         self.obj.dec_use()
48
49     def flush(self):
50         with llfuse.lock_released:
51             return self.obj.flush()
52
53
54 class FileHandle(Handle):
55     """Connects a numeric file handle to a File  object that has
56     been opened by the client."""
57     pass
58
59
60 class DirectoryHandle(Handle):
61     """Connects a numeric file handle to a Directory object that has
62     been opened by the client."""
63
64     def __init__(self, fh, dirobj, entries):
65         super(DirectoryHandle, self).__init__(fh, dirobj)
66         self.entries = entries
67
68
69 class InodeCache(object):
70     def __init__(self, cap, min_entries=4):
71         self._entries = collections.OrderedDict()
72         self._by_uuid = {}
73         self._counter = itertools.count(1)
74         self.cap = cap
75         self._total = 0
76         self.min_entries = min_entries
77
78     def total(self):
79         return self._total
80
81     def _remove(self, obj, clear):
82         if clear and not obj.clear():
83             _logger.debug("Could not clear %s in_use %s", obj, obj.in_use())
84             return False
85         self._total -= obj.cache_size
86         del self._entries[obj.cache_priority]
87         if obj.cache_uuid:
88             del self._by_uuid[obj.cache_uuid]
89             obj.cache_uuid = None
90         _logger.debug("Cleared %s total now %i", obj, self._total)
91         return True
92
93     def cap_cache(self):
94         _logger.debug("total is %i cap is %i", self._total, self.cap)
95         if self._total > self.cap:
96             for key in list(self._entries.keys()):
97                 if self._total < self.cap or len(self._entries) < self.min_entries:
98                     break
99                 self._remove(self._entries[key], True)
100
101     def manage(self, obj):
102         if obj.persisted():
103             obj.cache_priority = next(self._counter)
104             obj.cache_size = obj.objsize()
105             self._entries[obj.cache_priority] = obj
106             obj.cache_uuid = obj.uuid()
107             if obj.cache_uuid:
108                 self._by_uuid[obj.cache_uuid] = obj
109             self._total += obj.objsize()
110             _logger.debug("Managing %s total now %i", obj, self._total)
111             self.cap_cache()
112         else:
113             obj.cache_priority = None
114
115     def touch(self, obj):
116         if obj.persisted():
117             if obj.cache_priority in self._entries:
118                 self._remove(obj, False)
119             self.manage(obj)
120             _logger.debug("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
121
122     def unmanage(self, obj):
123         if obj.persisted() and obj.cache_priority in self._entries:
124             self._remove(obj, True)
125
126     def find(self, uuid):
127         return self._by_uuid.get(uuid)
128
129 class Inodes(object):
130     """Manage the set of inodes.  This is the mapping from a numeric id
131     to a concrete File or Directory object"""
132
133     def __init__(self, inode_cache):
134         self._entries = {}
135         self._counter = itertools.count(llfuse.ROOT_INODE)
136         self.inode_cache = inode_cache
137
138     def __getitem__(self, item):
139         return self._entries[item]
140
141     def __setitem__(self, key, item):
142         self._entries[key] = item
143
144     def __iter__(self):
145         return self._entries.iterkeys()
146
147     def items(self):
148         return self._entries.items()
149
150     def __contains__(self, k):
151         return k in self._entries
152
153     def touch(self, entry):
154         entry._atime = time.time()
155         self.inode_cache.touch(entry)
156
157     def add_entry(self, entry):
158         entry.inode = next(self._counter)
159         self._entries[entry.inode] = entry
160         self.inode_cache.manage(entry)
161         return entry
162
163     def del_entry(self, entry):
164         if entry.ref_count == 0:
165             _logger.warn("Deleting inode %i", entry.inode)
166             self.inode_cache.unmanage(entry)
167             llfuse.invalidate_inode(entry.inode)
168             del self._entries[entry.inode]
169         else:
170             _logger.warn("Inode %i has refcount %i", entry.inode, entry.ref_count)
171             entry.dead = True
172
173 def catch_exceptions(orig_func):
174     @functools.wraps(orig_func)
175     def catch_exceptions_wrapper(self, *args, **kwargs):
176         try:
177             return orig_func(self, *args, **kwargs)
178         except llfuse.FUSEError:
179             raise
180         except EnvironmentError as e:
181             raise llfuse.FUSEError(e.errno)
182         except:
183             _logger.exception("Unhandled exception during FUSE operation")
184             raise llfuse.FUSEError(errno.EIO)
185
186     return catch_exceptions_wrapper
187
188
189 class Operations(llfuse.Operations):
190     """This is the main interface with llfuse.
191
192     The methods on this object are called by llfuse threads to service FUSE
193     events to query and read from the file system.
194
195     llfuse has its own global lock which is acquired before calling a request handler,
196     so request handlers do not run concurrently unless the lock is explicitly released
197     using 'with llfuse.lock_released:'
198
199     """
200
201     def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4):
202         super(Operations, self).__init__()
203
204         if not inode_cache:
205             inode_cache = InodeCache(cap=256*1024*1024)
206         self.inodes = Inodes(inode_cache)
207         self.uid = uid
208         self.gid = gid
209         self.encoding = encoding
210
211         # dict of inode to filehandle
212         self._filehandles = {}
213         self._filehandles_counter = 1
214
215         # Other threads that need to wait until the fuse driver
216         # is fully initialized should wait() on this event object.
217         self.initlock = threading.Event()
218
219         self.num_retries = num_retries
220
221         self.events = None
222
223     def init(self):
224         # Allow threads that are waiting for the driver to be finished
225         # initializing to continue
226         self.initlock.set()
227
228     def destroy(self):
229         if self.events:
230             self.events.close()
231
232     def access(self, inode, mode, ctx):
233         return True
234
235     def listen_for_events(self, api_client):
236         self.event = arvados.events.subscribe(api_client,
237                                  [["event_type", "in", ["create", "update", "delete"]]],
238                                  self.on_event)
239
240     def on_event(self, ev):
241         if 'event_type' in ev:
242             with llfuse.lock:
243                 item = self.inodes.inode_cache.find(ev["object_uuid"])
244                 if item:
245                     item.invalidate()
246                     item.update()
247
248                 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
249                 if itemparent:
250                     itemparent.invalidate()
251                     itemparent.update()
252
253     @catch_exceptions
254     def getattr(self, inode):
255         if inode not in self.inodes:
256             raise llfuse.FUSEError(errno.ENOENT)
257
258         e = self.inodes[inode]
259
260         entry = llfuse.EntryAttributes()
261         entry.st_ino = inode
262         entry.generation = 0
263         entry.entry_timeout = 300
264         entry.attr_timeout = 300
265
266         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
267         if isinstance(e, Directory):
268             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
269         else:
270             entry.st_mode |= stat.S_IFREG
271             if isinstance(e, FuseArvadosFile):
272                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
273
274         if e.writable():
275             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
276
277         entry.st_nlink = 1
278         entry.st_uid = self.uid
279         entry.st_gid = self.gid
280         entry.st_rdev = 0
281
282         entry.st_size = e.size()
283
284         entry.st_blksize = 512
285         entry.st_blocks = (e.size()/512)+1
286         entry.st_atime = int(e.atime())
287         entry.st_mtime = int(e.mtime())
288         entry.st_ctime = int(e.mtime())
289
290         return entry
291
292     @catch_exceptions
293     def lookup(self, parent_inode, name):
294         name = unicode(name, self.encoding)
295         inode = None
296
297         if name == '.':
298             inode = parent_inode
299         else:
300             if parent_inode in self.inodes:
301                 p = self.inodes[parent_inode]
302                 if name == '..':
303                     inode = p.parent_inode
304                 elif isinstance(p, Directory) and name in p:
305                     inode = p[name].inode
306
307         if inode != None:
308             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
309                       parent_inode, name, inode)
310             self.inodes[inode].inc_ref()
311             return self.getattr(inode)
312         else:
313             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
314                       parent_inode, name)
315             raise llfuse.FUSEError(errno.ENOENT)
316
317     @catch_exceptions
318     def forget(self, inodes):
319         for inode, nlookup in inodes:
320             _logger.debug("arv-mount forget: %i %i", inode, nlookup)
321             ent = self.inodes[inode]
322             if ent.dec_ref(nlookup) == 0 and ent.dead:
323                 self.inodes.del_entry(ent)
324
325     @catch_exceptions
326     def open(self, inode, flags):
327         if inode in self.inodes:
328             p = self.inodes[inode]
329         else:
330             raise llfuse.FUSEError(errno.ENOENT)
331
332         if isinstance(p, Directory):
333             raise llfuse.FUSEError(errno.EISDIR)
334
335         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
336             raise llfuse.FUSEError(errno.EPERM)
337
338         fh = self._filehandles_counter
339         self._filehandles_counter += 1
340         self._filehandles[fh] = FileHandle(fh, p)
341         self.inodes.touch(p)
342         return fh
343
344     @catch_exceptions
345     def read(self, fh, off, size):
346         _logger.debug("arv-mount read %i %i %i", fh, off, size)
347         if fh in self._filehandles:
348             handle = self._filehandles[fh]
349         else:
350             raise llfuse.FUSEError(errno.EBADF)
351
352         self.inodes.touch(handle.obj)
353
354         try:
355             with llfuse.lock_released:
356                 return handle.obj.readfrom(off, size, self.num_retries)
357         except arvados.errors.NotFoundError as e:
358             _logger.warning("Block not found: " + str(e))
359             raise llfuse.FUSEError(errno.EIO)
360
361     @catch_exceptions
362     def write(self, fh, off, buf):
363         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
364         if fh in self._filehandles:
365             handle = self._filehandles[fh]
366         else:
367             raise llfuse.FUSEError(errno.EBADF)
368
369         if not handle.obj.writable():
370             raise llfuse.FUSEError(errno.EPERM)
371
372         self.inodes.touch(handle.obj)
373
374         with llfuse.lock_released:
375             return handle.obj.writeto(off, buf, self.num_retries)
376
377     @catch_exceptions
378     def release(self, fh):
379         if fh in self._filehandles:
380             try:
381                 self._filehandles[fh].flush()
382             except EnvironmentError as e:
383                 raise llfuse.FUSEError(e.errno)
384             except Exception:
385                 _logger.exception("Flush error")
386             self._filehandles[fh].release()
387             del self._filehandles[fh]
388         self.inodes.inode_cache.cap_cache()
389
390     def releasedir(self, fh):
391         self.release(fh)
392
393     @catch_exceptions
394     def opendir(self, inode):
395         _logger.debug("arv-mount opendir: inode %i", inode)
396
397         if inode in self.inodes:
398             p = self.inodes[inode]
399         else:
400             raise llfuse.FUSEError(errno.ENOENT)
401
402         if not isinstance(p, Directory):
403             raise llfuse.FUSEError(errno.ENOTDIR)
404
405         fh = self._filehandles_counter
406         self._filehandles_counter += 1
407         if p.parent_inode in self.inodes:
408             parent = self.inodes[p.parent_inode]
409         else:
410             raise llfuse.FUSEError(errno.EIO)
411
412         # update atime
413         self.inodes.touch(p)
414
415         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
416         return fh
417
418     @catch_exceptions
419     def readdir(self, fh, off):
420         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
421
422         if fh in self._filehandles:
423             handle = self._filehandles[fh]
424         else:
425             raise llfuse.FUSEError(errno.EBADF)
426
427         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
428
429         e = off
430         while e < len(handle.entries):
431             if handle.entries[e][1].inode in self.inodes:
432                 try:
433                     yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
434                 except UnicodeEncodeError:
435                     pass
436             e += 1
437
438     @catch_exceptions
439     def statfs(self):
440         st = llfuse.StatvfsData()
441         st.f_bsize = 64 * 1024
442         st.f_blocks = 0
443         st.f_files = 0
444
445         st.f_bfree = 0
446         st.f_bavail = 0
447
448         st.f_ffree = 0
449         st.f_favail = 0
450
451         st.f_frsize = 0
452         return st
453
454     def _check_writable(self, inode_parent):
455         if inode_parent in self.inodes:
456             p = self.inodes[inode_parent]
457         else:
458             raise llfuse.FUSEError(errno.ENOENT)
459
460         if not isinstance(p, Directory):
461             raise llfuse.FUSEError(errno.ENOTDIR)
462
463         if not p.writable():
464             raise llfuse.FUSEError(errno.EPERM)
465
466         if not isinstance(p, CollectionDirectoryBase):
467             raise llfuse.FUSEError(errno.EPERM)
468
469         return p
470
471     @catch_exceptions
472     def create(self, inode_parent, name, mode, flags, ctx):
473         p = self._check_writable(inode_parent)
474
475         with llfuse.lock_released:
476             p.collection.open(name, "w")
477
478         # The file entry should have been implicitly created by callback.
479         f = p[name]
480         fh = self._filehandles_counter
481         self._filehandles_counter += 1
482         self._filehandles[fh] = FileHandle(fh, f)
483         self.inodes.touch(p)
484
485         f.inc_ref()
486         return (fh, self.getattr(f.inode))
487
488     @catch_exceptions
489     def mkdir(self, inode_parent, name, mode, ctx):
490         p = self._check_writable(inode_parent)
491
492         with llfuse.lock_released:
493             p.collection.mkdirs(name)
494
495         # The dir entry should have been implicitly created by callback.
496         d = p[name]
497
498         d.inc_ref()
499         return self.getattr(d.inode)
500
501     @catch_exceptions
502     def unlink(self, inode_parent, name):
503         p = self._check_writable(inode_parent)
504
505         with llfuse.lock_released:
506             p.collection.remove(name)
507
508     def rmdir(self, inode_parent, name):
509         self.unlink(inode_parent, name)
510
511     @catch_exceptions
512     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
513         src = self._check_writable(inode_parent_old)
514         dest = self._check_writable(inode_parent_new)
515
516         with llfuse.lock_released:
517             dest.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
518             dest.flush()
519             src.flush()
520
521     @catch_exceptions
522     def flush(self, fh):
523         if fh in self._filehandles:
524             self._filehandles[fh].flush()
525
526     def fsync(self, fh, datasync):
527         self.flush(fh)
528
529     def fsyncdir(self, fh, datasync):
530         self.flush(fh)