Merge branch '3198-inode-cache' into 3198-writable-fuse
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23 import ciso8601
24 import collections
25
26 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory
27 from fusefile import StringFile, FuseArvadosFile
28
29 _logger = logging.getLogger('arvados.arvados_fuse')
30
31
32 class FileHandle(object):
33     """Connects a numeric file handle to a File object that has
34     been opened by the client."""
35
36     def __init__(self, fh, fileobj):
37         self.fh = fh
38         self.fileobj = fileobj
39         self.fileobj.inc_use()
40
41     def release(self):
42         self.fileobj.dec_use()
43
44
45 class DirectoryHandle(object):
46     """Connects a numeric file handle to a Directory object that has
47     been opened by the client."""
48
49     def __init__(self, fh, dirobj, entries):
50         self.fh = fh
51         self.entries = entries
52         self.dirobj = dirobj
53         self.dirobj.inc_use()
54
55     def release(self):
56         self.dirobj.dec_use()
57
58
59 class InodeCache(object):
60     def __init__(self, cap):
61         self._entries = collections.OrderedDict()
62         self._counter = itertools.count(1)
63         self.cap = cap
64         self._total = 0
65
66     def _remove(self, obj, clear):
67         if clear and not obj.clear():
68             _logger.debug("Could not clear %s in_use %s", obj, obj.in_use())
69             return False
70         self._total -= obj._cache_size
71         del self._entries[obj._cache_priority]
72         _logger.debug("Cleared %s total now %i", obj, self._total)
73         return True
74
75     def cap_cache(self):
76         _logger.debug("total is %i cap is %i", self._total, self.cap)
77         if self._total > self.cap:
78             need_gc = False
79             for key in list(self._entries.keys()):
80                 if self._total < self.cap or len(self._entries) < 4:
81                     break
82                 self._remove(self._entries[key], True)
83
84
85     def manage(self, obj):
86         if obj.persisted():
87             obj._cache_priority = next(self._counter)
88             obj._cache_size = obj.objsize()
89             self._entries[obj._cache_priority] = obj
90             self._total += obj.objsize()
91             _logger.debug("Managing %s total now %i", obj, self._total)
92             self.cap_cache()
93
94     def touch(self, obj):
95         if obj.persisted():
96             if obj._cache_priority in self._entries:
97                 self._remove(obj, False)
98             self.manage(obj)
99             _logger.debug("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
100
101     def unmanage(self, obj):
102         if obj.persisted() and obj._cache_priority in self._entries:
103             self._remove(obj, True)
104
105 class Inodes(object):
106     """Manage the set of inodes.  This is the mapping from a numeric id
107     to a concrete File or Directory object"""
108
109     def __init__(self, inode_cache=256*1024*1024):
110         self._entries = {}
111         self._counter = itertools.count(llfuse.ROOT_INODE)
112         self._obj_cache = InodeCache(cap=inode_cache)
113
114     def __getitem__(self, item):
115         return self._entries[item]
116
117     def __setitem__(self, key, item):
118         self._entries[key] = item
119
120     def __iter__(self):
121         return self._entries.iterkeys()
122
123     def items(self):
124         return self._entries.items()
125
126     def __contains__(self, k):
127         return k in self._entries
128
129     def touch(self, entry):
130         entry._atime = time.time()
131         self._obj_cache.touch(entry)
132
133     def cap_cache(self):
134         self._obj_cache.cap_cache()
135
136     def add_entry(self, entry):
137         entry.inode = next(self._counter)
138         self._entries[entry.inode] = entry
139         self._obj_cache.manage(entry)
140         return entry
141
142     def del_entry(self, entry):
143         self._obj_cache.unmanage(entry)
144         llfuse.invalidate_inode(entry.inode)
145         del self._entries[entry.inode]
146
147
148 class Operations(llfuse.Operations):
149     """This is the main interface with llfuse.
150
151     The methods on this object are called by llfuse threads to service FUSE
152     events to query and read from the file system.
153
154     llfuse has its own global lock which is acquired before calling a request handler,
155     so request handlers do not run concurrently unless the lock is explicitly released
156     using 'with llfuse.lock_released:'
157
158     """
159
160     def __init__(self, uid, gid, encoding="utf-8", inode_cache=1000, num_retries=7):
161         super(Operations, self).__init__()
162
163         self.inodes = Inodes(inode_cache)
164         self.uid = uid
165         self.gid = gid
166         self.encoding = encoding
167
168         # dict of inode to filehandle
169         self._filehandles = {}
170         self._filehandles_counter = 1
171
172         # Other threads that need to wait until the fuse driver
173         # is fully initialized should wait() on this event object.
174         self.initlock = threading.Event()
175
176         self.num_retries = num_retries
177
178     def init(self):
179         # Allow threads that are waiting for the driver to be finished
180         # initializing to continue
181         self.initlock.set()
182
183     def access(self, inode, mode, ctx):
184         return True
185
186     def getattr(self, inode):
187         if inode not in self.inodes:
188             raise llfuse.FUSEError(errno.ENOENT)
189
190         e = self.inodes[inode]
191
192         entry = llfuse.EntryAttributes()
193         entry.st_ino = inode
194         entry.generation = 0
195         entry.entry_timeout = 300
196         entry.attr_timeout = 300
197
198         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
199         if isinstance(e, Directory):
200             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
201         elif isinstance(e, FuseArvadosFile):
202             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
203         else:
204             entry.st_mode |= stat.S_IFREG
205
206         entry.st_nlink = 1
207         entry.st_uid = self.uid
208         entry.st_gid = self.gid
209         entry.st_rdev = 0
210
211         entry.st_size = e.size()
212
213         entry.st_blksize = 512
214         entry.st_blocks = (e.size()/512)+1
215         entry.st_atime = int(e.atime())
216         entry.st_mtime = int(e.mtime())
217         entry.st_ctime = int(e.mtime())
218
219         return entry
220
221     def lookup(self, parent_inode, name):
222         name = unicode(name, self.encoding)
223         _logger.debug("arv-mount lookup: parent_inode %i name %s",
224                       parent_inode, name)
225         inode = None
226
227         if name == '.':
228             inode = parent_inode
229         else:
230             if parent_inode in self.inodes:
231                 p = self.inodes[parent_inode]
232                 if name == '..':
233                     inode = p.parent_inode
234                 elif isinstance(p, Directory) and name in p:
235                     inode = p[name].inode
236
237         if inode != None:
238             return self.getattr(inode)
239         else:
240             raise llfuse.FUSEError(errno.ENOENT)
241
242     def open(self, inode, flags):
243         if inode in self.inodes:
244             p = self.inodes[inode]
245         else:
246             raise llfuse.FUSEError(errno.ENOENT)
247
248         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
249             raise llfuse.FUSEError(errno.EROFS)
250
251         if isinstance(p, Directory):
252             raise llfuse.FUSEError(errno.EISDIR)
253
254         fh = self._filehandles_counter
255         self._filehandles_counter += 1
256         self._filehandles[fh] = FileHandle(fh, p)
257         self.inodes.touch(p)
258         return fh
259
260     def read(self, fh, off, size):
261         _logger.debug("arv-mount read %i %i %i", fh, off, size)
262         if fh in self._filehandles:
263             handle = self._filehandles[fh]
264         else:
265             raise llfuse.FUSEError(errno.EBADF)
266
267         self.inodes.touch(handle.fileobj)
268
269         try:
270             with llfuse.lock_released:
271                 return handle.fileobj.readfrom(off, size, self.num_retries)
272         except arvados.errors.NotFoundError as e:
273             _logger.warning("Block not found: " + str(e))
274             raise llfuse.FUSEError(errno.EIO)
275         except Exception:
276             _logger.exception("Read error")
277             raise llfuse.FUSEError(errno.EIO)
278
279     def release(self, fh):
280         if fh in self._filehandles:
281             self._filehandles[fh].release()
282             del self._filehandles[fh]
283         self.inodes.cap_cache()
284
285     def releasedir(self, fh):
286         self.release(fh)
287
288     def opendir(self, inode):
289         _logger.debug("arv-mount opendir: inode %i", inode)
290
291         if inode in self.inodes:
292             p = self.inodes[inode]
293         else:
294             raise llfuse.FUSEError(errno.ENOENT)
295
296         if not isinstance(p, Directory):
297             raise llfuse.FUSEError(errno.ENOTDIR)
298
299         fh = self._filehandles_counter
300         self._filehandles_counter += 1
301         if p.parent_inode in self.inodes:
302             parent = self.inodes[p.parent_inode]
303         else:
304             raise llfuse.FUSEError(errno.EIO)
305
306         # update atime
307         self.inodes.touch(p)
308
309         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
310         return fh
311
312
313     def readdir(self, fh, off):
314         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
315
316         if fh in self._filehandles:
317             handle = self._filehandles[fh]
318         else:
319             raise llfuse.FUSEError(errno.EBADF)
320
321         _logger.debug("arv-mount handle.dirobj %s", handle.dirobj)
322
323         e = off
324         while e < len(handle.entries):
325             if handle.entries[e][1].inode in self.inodes:
326                 try:
327                     yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
328                 except UnicodeEncodeError:
329                     pass
330             e += 1
331
332     def statfs(self):
333         st = llfuse.StatvfsData()
334         st.f_bsize = 64 * 1024
335         st.f_blocks = 0
336         st.f_files = 0
337
338         st.f_bfree = 0
339         st.f_bavail = 0
340
341         st.f_ffree = 0
342         st.f_favail = 0
343
344         st.f_frsize = 0
345         return st
346
347     # The llfuse documentation recommends only overloading functions that
348     # are actually implemented, as the default implementation will raise ENOSYS.
349     # However, there is a bug in the llfuse default implementation of create()
350     # "create() takes exactly 5 positional arguments (6 given)" which will crash
351     # arv-mount.
352     # The workaround is to implement it with the proper number of parameters,
353     # and then everything works out.
354     def create(self, inode_parent, name, mode, flags, ctx):
355         raise llfuse.FUSEError(errno.EROFS)