Merge branch '5992-keep-proxy-ignore-locator-hints' closes #5992
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23 import ciso8601
24 import collections
25
26 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory
27 from fusefile import StreamReaderFile, StringFile
28
29 _logger = logging.getLogger('arvados.arvados_fuse')
30
31
32 class FileHandle(object):
33     """Connects a numeric file handle to a File object that has
34     been opened by the client."""
35
36     def __init__(self, fh, fileobj):
37         self.fh = fh
38         self.fileobj = fileobj
39         self.fileobj.inc_use()
40
41     def release(self):
42         self.fileobj.dec_use()
43
44
45 class DirectoryHandle(object):
46     """Connects a numeric file handle to a Directory object that has
47     been opened by the client."""
48
49     def __init__(self, fh, dirobj, entries):
50         self.fh = fh
51         self.entries = entries
52         self.dirobj = dirobj
53         self.dirobj.inc_use()
54
55     def release(self):
56         self.dirobj.dec_use()
57
58
59 class InodeCache(object):
60     def __init__(self, cap, min_entries=4):
61         self._entries = collections.OrderedDict()
62         self._counter = itertools.count(1)
63         self.cap = cap
64         self._total = 0
65         self.min_entries = min_entries
66
67     def total(self):
68         return self._total
69
70     def _remove(self, obj, clear):
71         if clear and not obj.clear():
72             _logger.debug("Could not clear %s in_use %s", obj, obj.in_use())
73             return False
74         self._total -= obj.cache_size
75         del self._entries[obj.cache_priority]
76         _logger.debug("Cleared %s total now %i", obj, self._total)
77         return True
78
79     def cap_cache(self):
80         _logger.debug("total is %i cap is %i", self._total, self.cap)
81         if self._total > self.cap:
82             for key in list(self._entries.keys()):
83                 if self._total < self.cap or len(self._entries) < self.min_entries:
84                     break
85                 self._remove(self._entries[key], True)
86
87     def manage(self, obj):
88         if obj.persisted():
89             obj.cache_priority = next(self._counter)
90             obj.cache_size = obj.objsize()
91             self._entries[obj.cache_priority] = obj
92             self._total += obj.objsize()
93             _logger.debug("Managing %s total now %i", obj, self._total)
94             self.cap_cache()
95
96     def touch(self, obj):
97         if obj.persisted():
98             if obj.cache_priority in self._entries:
99                 self._remove(obj, False)
100             self.manage(obj)
101             _logger.debug("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
102
103     def unmanage(self, obj):
104         if obj.persisted() and obj.cache_priority in self._entries:
105             self._remove(obj, True)
106
107 class Inodes(object):
108     """Manage the set of inodes.  This is the mapping from a numeric id
109     to a concrete File or Directory object"""
110
111     def __init__(self, inode_cache):
112         self._entries = {}
113         self._counter = itertools.count(llfuse.ROOT_INODE)
114         self.inode_cache = inode_cache
115
116     def __getitem__(self, item):
117         return self._entries[item]
118
119     def __setitem__(self, key, item):
120         self._entries[key] = item
121
122     def __iter__(self):
123         return self._entries.iterkeys()
124
125     def items(self):
126         return self._entries.items()
127
128     def __contains__(self, k):
129         return k in self._entries
130
131     def touch(self, entry):
132         entry._atime = time.time()
133         self.inode_cache.touch(entry)
134
135     def add_entry(self, entry):
136         entry.inode = next(self._counter)
137         self._entries[entry.inode] = entry
138         self.inode_cache.manage(entry)
139         return entry
140
141     def del_entry(self, entry):
142         self.inode_cache.unmanage(entry)
143         llfuse.invalidate_inode(entry.inode)
144         del self._entries[entry.inode]
145
146
147 class Operations(llfuse.Operations):
148     """This is the main interface with llfuse.
149
150     The methods on this object are called by llfuse threads to service FUSE
151     events to query and read from the file system.
152
153     llfuse has its own global lock which is acquired before calling a request handler,
154     so request handlers do not run concurrently unless the lock is explicitly released
155     using 'with llfuse.lock_released:'
156
157     """
158
159     def __init__(self, uid, gid, encoding="utf-8", inode_cache=None):
160         super(Operations, self).__init__()
161
162         if not inode_cache:
163             inode_cache = InodeCache(cap=256*1024*1024)
164         self.inodes = Inodes(inode_cache)
165         self.uid = uid
166         self.gid = gid
167         self.encoding = encoding
168
169         # dict of inode to filehandle
170         self._filehandles = {}
171         self._filehandles_counter = 1
172
173         # Other threads that need to wait until the fuse driver
174         # is fully initialized should wait() on this event object.
175         self.initlock = threading.Event()
176
177     def init(self):
178         # Allow threads that are waiting for the driver to be finished
179         # initializing to continue
180         self.initlock.set()
181
182     def access(self, inode, mode, ctx):
183         return True
184
185     def getattr(self, inode):
186         if inode not in self.inodes:
187             raise llfuse.FUSEError(errno.ENOENT)
188
189         e = self.inodes[inode]
190
191         entry = llfuse.EntryAttributes()
192         entry.st_ino = inode
193         entry.generation = 0
194         entry.entry_timeout = 300
195         entry.attr_timeout = 300
196
197         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
198         if isinstance(e, Directory):
199             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
200         elif isinstance(e, StreamReaderFile):
201             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
202         else:
203             entry.st_mode |= stat.S_IFREG
204
205         entry.st_nlink = 1
206         entry.st_uid = self.uid
207         entry.st_gid = self.gid
208         entry.st_rdev = 0
209
210         entry.st_size = e.size()
211
212         entry.st_blksize = 512
213         entry.st_blocks = (e.size()/512)+1
214         entry.st_atime = int(e.atime())
215         entry.st_mtime = int(e.mtime())
216         entry.st_ctime = int(e.mtime())
217
218         return entry
219
220     def lookup(self, parent_inode, name):
221         name = unicode(name, self.encoding)
222         _logger.debug("arv-mount lookup: parent_inode %i name %s",
223                       parent_inode, name)
224         inode = None
225
226         if name == '.':
227             inode = parent_inode
228         else:
229             if parent_inode in self.inodes:
230                 p = self.inodes[parent_inode]
231                 if name == '..':
232                     inode = p.parent_inode
233                 elif isinstance(p, Directory) and name in p:
234                     inode = p[name].inode
235
236         if inode != None:
237             return self.getattr(inode)
238         else:
239             raise llfuse.FUSEError(errno.ENOENT)
240
241     def open(self, inode, flags):
242         if inode in self.inodes:
243             p = self.inodes[inode]
244         else:
245             raise llfuse.FUSEError(errno.ENOENT)
246
247         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
248             raise llfuse.FUSEError(errno.EROFS)
249
250         if isinstance(p, Directory):
251             raise llfuse.FUSEError(errno.EISDIR)
252
253         fh = self._filehandles_counter
254         self._filehandles_counter += 1
255         self._filehandles[fh] = FileHandle(fh, p)
256         self.inodes.touch(p)
257         return fh
258
259     def read(self, fh, off, size):
260         _logger.debug("arv-mount read %i %i %i", fh, off, size)
261         if fh in self._filehandles:
262             handle = self._filehandles[fh]
263         else:
264             raise llfuse.FUSEError(errno.EBADF)
265
266         self.inodes.touch(handle.fileobj)
267
268         try:
269             with llfuse.lock_released:
270                 return handle.fileobj.readfrom(off, size)
271         except arvados.errors.NotFoundError as e:
272             _logger.warning("Block not found: " + str(e))
273             raise llfuse.FUSEError(errno.EIO)
274         except Exception:
275             _logger.exception()
276             raise llfuse.FUSEError(errno.EIO)
277
278     def release(self, fh):
279         if fh in self._filehandles:
280             self._filehandles[fh].release()
281             del self._filehandles[fh]
282         self.inodes.inode_cache.cap_cache()
283
284     def releasedir(self, fh):
285         self.release(fh)
286
287     def opendir(self, inode):
288         _logger.debug("arv-mount opendir: inode %i", inode)
289
290         if inode in self.inodes:
291             p = self.inodes[inode]
292         else:
293             raise llfuse.FUSEError(errno.ENOENT)
294
295         if not isinstance(p, Directory):
296             raise llfuse.FUSEError(errno.ENOTDIR)
297
298         fh = self._filehandles_counter
299         self._filehandles_counter += 1
300         if p.parent_inode in self.inodes:
301             parent = self.inodes[p.parent_inode]
302         else:
303             raise llfuse.FUSEError(errno.EIO)
304
305         # update atime
306         self.inodes.touch(p)
307
308         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
309         return fh
310
311
312     def readdir(self, fh, off):
313         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
314
315         if fh in self._filehandles:
316             handle = self._filehandles[fh]
317         else:
318             raise llfuse.FUSEError(errno.EBADF)
319
320         _logger.debug("arv-mount handle.dirobj %s", handle.dirobj)
321
322         e = off
323         while e < len(handle.entries):
324             if handle.entries[e][1].inode in self.inodes:
325                 try:
326                     yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
327                 except UnicodeEncodeError:
328                     pass
329             e += 1
330
331     def statfs(self):
332         st = llfuse.StatvfsData()
333         st.f_bsize = 64 * 1024
334         st.f_blocks = 0
335         st.f_files = 0
336
337         st.f_bfree = 0
338         st.f_bavail = 0
339
340         st.f_ffree = 0
341         st.f_favail = 0
342
343         st.f_frsize = 0
344         return st
345
346     # The llfuse documentation recommends only overloading functions that
347     # are actually implemented, as the default implementation will raise ENOSYS.
348     # However, there is a bug in the llfuse default implementation of create()
349     # "create() takes exactly 5 positional arguments (6 given)" which will crash
350     # arv-mount.
351     # The workaround is to implement it with the proper number of parameters,
352     # and then everything works out.
353     def create(self, inode_parent, name, mode, flags, ctx):
354         raise llfuse.FUSEError(errno.EROFS)