3198: Fix test deadlock. Track inode lifetimes.
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23 import ciso8601
24 import collections
25 import functools
26
27 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
28 from fusefile import StringFile, FuseArvadosFile
29
30 _logger = logging.getLogger('arvados.arvados_fuse')
31
32 log_handler = logging.StreamHandler()
33 llogger = logging.getLogger('llfuse')
34 llogger.addHandler(log_handler)
35 llogger.setLevel(logging.DEBUG)
36
37 class Handle(object):
38     """Connects a numeric file handle to a File or Directory object that has
39     been opened by the client."""
40
41     def __init__(self, fh, obj):
42         self.fh = fh
43         self.obj = obj
44         self.obj.inc_use()
45
46     def release(self):
47         self.obj.dec_use()
48
49     def flush(self):
50         with llfuse.lock_released:
51             return self.obj.flush()
52
53
54 class FileHandle(Handle):
55     """Connects a numeric file handle to a File  object that has
56     been opened by the client."""
57     pass
58
59
60 class DirectoryHandle(Handle):
61     """Connects a numeric file handle to a Directory object that has
62     been opened by the client."""
63
64     def __init__(self, fh, dirobj, entries):
65         super(DirectoryHandle, self).__init__(fh, dirobj)
66         self.entries = entries
67
68
69 class InodeCache(object):
70     def __init__(self, cap):
71         self._entries = collections.OrderedDict()
72         self._counter = itertools.count(1)
73         self.cap = cap
74         self._total = 0
75
76     def _remove(self, obj, clear):
77         if clear and not obj.clear():
78             _logger.debug("Could not clear %s in_use %s", obj, obj.in_use())
79             return False
80         self._total -= obj._cache_size
81         del self._entries[obj._cache_priority]
82         _logger.debug("Cleared %s total now %i", obj, self._total)
83         return True
84
85     def cap_cache(self):
86         _logger.debug("total is %i cap is %i", self._total, self.cap)
87         if self._total > self.cap:
88             need_gc = False
89             for key in list(self._entries.keys()):
90                 if self._total < self.cap or len(self._entries) < 4:
91                     break
92                 self._remove(self._entries[key], True)
93
94
95     def manage(self, obj):
96         if obj.persisted():
97             obj._cache_priority = next(self._counter)
98             obj._cache_size = obj.objsize()
99             self._entries[obj._cache_priority] = obj
100             self._total += obj.objsize()
101             _logger.debug("Managing %s total now %i", obj, self._total)
102             self.cap_cache()
103
104     def touch(self, obj):
105         if obj.persisted():
106             if obj._cache_priority in self._entries:
107                 self._remove(obj, False)
108             self.manage(obj)
109             _logger.debug("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
110
111     def unmanage(self, obj):
112         if obj.persisted() and obj._cache_priority in self._entries:
113             self._remove(obj, True)
114
115
116 class Inodes(object):
117     """Manage the set of inodes.  This is the mapping from a numeric id
118     to a concrete File or Directory object"""
119
120     def __init__(self, inode_cache=256*1024*1024):
121         self._entries = {}
122         self._counter = itertools.count(llfuse.ROOT_INODE)
123         self._obj_cache = InodeCache(cap=inode_cache)
124
125     def __getitem__(self, item):
126         return self._entries[item]
127
128     def __setitem__(self, key, item):
129         self._entries[key] = item
130
131     def __iter__(self):
132         return self._entries.iterkeys()
133
134     def items(self):
135         return self._entries.items()
136
137     def __contains__(self, k):
138         return k in self._entries
139
140     def touch(self, entry):
141         entry._atime = time.time()
142         self._obj_cache.touch(entry)
143
144     def cap_cache(self):
145         self._obj_cache.cap_cache()
146
147     def add_entry(self, entry):
148         entry.inode = next(self._counter)
149         self._entries[entry.inode] = entry
150         self._obj_cache.manage(entry)
151         return entry
152
153     def del_entry(self, entry):
154         if entry.ref_count == 0:
155             _logger.warn("Deleting inode %i", entry.inode)
156             self._obj_cache.unmanage(entry)
157             llfuse.invalidate_inode(entry.inode)
158             del self._entries[entry.inode]
159         else:
160             _logger.warn("Inode %i has refcount %i", entry.inode, entry.ref_count)
161             entry.dead = True
162
163 def catch_exceptions(orig_func):
164     @functools.wraps(orig_func)
165     def catch_exceptions_wrapper(self, *args, **kwargs):
166         try:
167             return orig_func(self, *args, **kwargs)
168         except llfuse.FUSEError:
169             raise
170         except EnvironmentError as e:
171             raise llfuse.FUSEError(e.errno)
172         except:
173             _logger.exception("Unhandled exception during FUSE operation")
174             raise llfuse.FUSEError(errno.EIO)
175
176     return catch_exceptions_wrapper
177
178
179 class Operations(llfuse.Operations):
180     """This is the main interface with llfuse.
181
182     The methods on this object are called by llfuse threads to service FUSE
183     events to query and read from the file system.
184
185     llfuse has its own global lock which is acquired before calling a request handler,
186     so request handlers do not run concurrently unless the lock is explicitly released
187     using 'with llfuse.lock_released:'
188
189     """
190
191     def __init__(self, uid, gid, encoding="utf-8", inode_cache=1000, num_retries=7):
192         super(Operations, self).__init__()
193
194         self.inodes = Inodes(inode_cache)
195         self.uid = uid
196         self.gid = gid
197         self.encoding = encoding
198
199         # dict of inode to filehandle
200         self._filehandles = {}
201         self._filehandles_counter = 1
202
203         # Other threads that need to wait until the fuse driver
204         # is fully initialized should wait() on this event object.
205         self.initlock = threading.Event()
206
207         self.num_retries = num_retries
208
209     def init(self):
210         # Allow threads that are waiting for the driver to be finished
211         # initializing to continue
212         self.initlock.set()
213
214     def access(self, inode, mode, ctx):
215         return True
216
217     @catch_exceptions
218     def getattr(self, inode):
219         if inode not in self.inodes:
220             raise llfuse.FUSEError(errno.ENOENT)
221
222         e = self.inodes[inode]
223
224         entry = llfuse.EntryAttributes()
225         entry.st_ino = inode
226         entry.generation = 0
227         entry.entry_timeout = 300
228         entry.attr_timeout = 300
229
230         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
231         if isinstance(e, Directory):
232             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
233         else:
234             entry.st_mode |= stat.S_IFREG
235             if isinstance(e, FuseArvadosFile):
236                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
237
238         if e.writable():
239             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
240
241         entry.st_nlink = 1
242         entry.st_uid = self.uid
243         entry.st_gid = self.gid
244         entry.st_rdev = 0
245
246         entry.st_size = e.size()
247
248         entry.st_blksize = 512
249         entry.st_blocks = (e.size()/512)+1
250         entry.st_atime = int(e.atime())
251         entry.st_mtime = int(e.mtime())
252         entry.st_ctime = int(e.mtime())
253
254         return entry
255
256     @catch_exceptions
257     def lookup(self, parent_inode, name):
258         name = unicode(name, self.encoding)
259         inode = None
260
261         if name == '.':
262             inode = parent_inode
263         else:
264             if parent_inode in self.inodes:
265                 p = self.inodes[parent_inode]
266                 if name == '..':
267                     inode = p.parent_inode
268                 elif isinstance(p, Directory) and name in p:
269                     inode = p[name].inode
270
271         if inode != None:
272             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
273                       parent_inode, name, inode)
274             self.inodes[inode].inc_ref()
275             return self.getattr(inode)
276         else:
277             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
278                       parent_inode, name)
279             raise llfuse.FUSEError(errno.ENOENT)
280
281     @catch_exceptions
282     def forget(self, inodes):
283         for inode, nlookup in inodes:
284             _logger.debug("arv-mount forget: %i %i", inode, nlookup)
285             ent = self.inodes[inode]
286             if ent.dec_ref(nlookup) == 0 and ent.dead:
287                 self.inodes.del_entry(ent)
288
289     @catch_exceptions
290     def open(self, inode, flags):
291         if inode in self.inodes:
292             p = self.inodes[inode]
293         else:
294             raise llfuse.FUSEError(errno.ENOENT)
295
296         if isinstance(p, Directory):
297             raise llfuse.FUSEError(errno.EISDIR)
298
299         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
300             raise llfuse.FUSEError(errno.EPERM)
301
302         fh = self._filehandles_counter
303         self._filehandles_counter += 1
304         self._filehandles[fh] = FileHandle(fh, p)
305         self.inodes.touch(p)
306         return fh
307
308     @catch_exceptions
309     def read(self, fh, off, size):
310         _logger.debug("arv-mount read %i %i %i", fh, off, size)
311         if fh in self._filehandles:
312             handle = self._filehandles[fh]
313         else:
314             raise llfuse.FUSEError(errno.EBADF)
315
316         self.inodes.touch(handle.obj)
317
318         try:
319             with llfuse.lock_released:
320                 return handle.obj.readfrom(off, size, self.num_retries)
321         except arvados.errors.NotFoundError as e:
322             _logger.warning("Block not found: " + str(e))
323             raise llfuse.FUSEError(errno.EIO)
324
325     @catch_exceptions
326     def write(self, fh, off, buf):
327         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
328         if fh in self._filehandles:
329             handle = self._filehandles[fh]
330         else:
331             raise llfuse.FUSEError(errno.EBADF)
332
333         if not handle.obj.writable():
334             raise llfuse.FUSEError(errno.EPERM)
335
336         self.inodes.touch(handle.obj)
337
338         with llfuse.lock_released:
339             return handle.obj.writeto(off, buf, self.num_retries)
340
341     @catch_exceptions
342     def release(self, fh):
343         if fh in self._filehandles:
344             try:
345                 self._filehandles[fh].flush()
346             except EnvironmentError as e:
347                 raise llfuse.FUSEError(e.errno)
348             except Exception:
349                 _logger.exception("Flush error")
350             self._filehandles[fh].release()
351             del self._filehandles[fh]
352         self.inodes.cap_cache()
353
354     def releasedir(self, fh):
355         self.release(fh)
356
357     @catch_exceptions
358     def opendir(self, inode):
359         _logger.debug("arv-mount opendir: inode %i", inode)
360
361         if inode in self.inodes:
362             p = self.inodes[inode]
363         else:
364             raise llfuse.FUSEError(errno.ENOENT)
365
366         if not isinstance(p, Directory):
367             raise llfuse.FUSEError(errno.ENOTDIR)
368
369         fh = self._filehandles_counter
370         self._filehandles_counter += 1
371         if p.parent_inode in self.inodes:
372             parent = self.inodes[p.parent_inode]
373         else:
374             raise llfuse.FUSEError(errno.EIO)
375
376         # update atime
377         self.inodes.touch(p)
378
379         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
380         return fh
381
382     @catch_exceptions
383     def readdir(self, fh, off):
384         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
385
386         if fh in self._filehandles:
387             handle = self._filehandles[fh]
388         else:
389             raise llfuse.FUSEError(errno.EBADF)
390
391         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
392
393         e = off
394         while e < len(handle.entries):
395             if handle.entries[e][1].inode in self.inodes:
396                 try:
397                     yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
398                 except UnicodeEncodeError:
399                     pass
400             e += 1
401
402     @catch_exceptions
403     def statfs(self):
404         st = llfuse.StatvfsData()
405         st.f_bsize = 64 * 1024
406         st.f_blocks = 0
407         st.f_files = 0
408
409         st.f_bfree = 0
410         st.f_bavail = 0
411
412         st.f_ffree = 0
413         st.f_favail = 0
414
415         st.f_frsize = 0
416         return st
417
418     def _check_writable(self, inode_parent):
419         if inode_parent in self.inodes:
420             p = self.inodes[inode_parent]
421         else:
422             raise llfuse.FUSEError(errno.ENOENT)
423
424         if not isinstance(p, Directory):
425             raise llfuse.FUSEError(errno.ENOTDIR)
426
427         if not p.writable():
428             raise llfuse.FUSEError(errno.EPERM)
429
430         if not isinstance(p, CollectionDirectoryBase):
431             raise llfuse.FUSEError(errno.EPERM)
432
433         return p
434
435     @catch_exceptions
436     def create(self, inode_parent, name, mode, flags, ctx):
437         p = self._check_writable(inode_parent)
438
439         with llfuse.lock_released:
440             p.collection.open(name, "w")
441
442         # The file entry should have been implicitly created by callback.
443         f = p[name]
444         fh = self._filehandles_counter
445         self._filehandles_counter += 1
446         self._filehandles[fh] = FileHandle(fh, f)
447         self.inodes.touch(p)
448
449         f.inc_ref()
450         return (fh, self.getattr(f.inode))
451
452     @catch_exceptions
453     def mkdir(self, inode_parent, name, mode, ctx):
454         p = self._check_writable(inode_parent)
455
456         with llfuse.lock_released:
457             p.collection.mkdirs(name)
458
459         # The dir entry should have been implicitly created by callback.
460         d = p[name]
461
462         d.inc_ref()
463         return self.getattr(d.inode)
464
465     @catch_exceptions
466     def unlink(self, inode_parent, name):
467         p = self._check_writable(inode_parent)
468
469         with llfuse.lock_released:
470             p.collection.remove(name)
471
472     def rmdir(self, inode_parent, name):
473         self.unlink(inode_parent, name)
474
475     # @catch_exceptions
476     # def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
477     #     src = self._check_writable(inode_parent_old)
478     #     dest = self._check_writable(inode_parent_new)
479     #
480     #     with llfuse.lock_released:
481     #         dest.collection.copy(name_old, name_new, source_collection=src.collection, overwrite=True)
482     #         src.collection.remove(name_old)
483
484     @catch_exceptions
485     def flush(self, fh):
486         if fh in self._filehandles:
487             self._filehandles[fh].flush()
488
489     def fsync(self, fh, datasync):
490         self.flush(fh)
491
492     def fsyncdir(self, fh, datasync):
493         self.flush(fh)