3198: Can modify file, collection objects, changes are reflected in FUSE.
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23 import ciso8601
24 import collections
25
26 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
27 from fusefile import StringFile, FuseArvadosFile
28
29 _logger = logging.getLogger('arvados.arvados_fuse')
30
31 # log_handler = logging.StreamHandler()
32 # llogger = logging.getLogger('llfuse')
33 # llogger.addHandler(log_handler)
34 # llogger.setLevel(logging.DEBUG)
35
36 class FileHandle(object):
37     """Connects a numeric file handle to a File object that has
38     been opened by the client."""
39
40     def __init__(self, fh, fileobj):
41         self.fh = fh
42         self.fileobj = fileobj
43         self.fileobj.inc_use()
44
45     def release(self):
46         self.fileobj.dec_use()
47
48
49 class DirectoryHandle(object):
50     """Connects a numeric file handle to a Directory object that has
51     been opened by the client."""
52
53     def __init__(self, fh, dirobj, entries):
54         self.fh = fh
55         self.entries = entries
56         self.dirobj = dirobj
57         self.dirobj.inc_use()
58
59     def release(self):
60         self.dirobj.dec_use()
61
62
63 class InodeCache(object):
64     def __init__(self, cap):
65         self._entries = collections.OrderedDict()
66         self._counter = itertools.count(1)
67         self.cap = cap
68         self._total = 0
69
70     def _remove(self, obj, clear):
71         if clear and not obj.clear():
72             _logger.debug("Could not clear %s in_use %s", obj, obj.in_use())
73             return False
74         self._total -= obj._cache_size
75         del self._entries[obj._cache_priority]
76         _logger.debug("Cleared %s total now %i", obj, self._total)
77         return True
78
79     def cap_cache(self):
80         _logger.debug("total is %i cap is %i", self._total, self.cap)
81         if self._total > self.cap:
82             need_gc = False
83             for key in list(self._entries.keys()):
84                 if self._total < self.cap or len(self._entries) < 4:
85                     break
86                 self._remove(self._entries[key], True)
87
88
89     def manage(self, obj):
90         if obj.persisted():
91             obj._cache_priority = next(self._counter)
92             obj._cache_size = obj.objsize()
93             self._entries[obj._cache_priority] = obj
94             self._total += obj.objsize()
95             _logger.debug("Managing %s total now %i", obj, self._total)
96             self.cap_cache()
97
98     def touch(self, obj):
99         if obj.persisted():
100             if obj._cache_priority in self._entries:
101                 self._remove(obj, False)
102             self.manage(obj)
103             _logger.debug("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
104
105     def unmanage(self, obj):
106         if obj.persisted() and obj._cache_priority in self._entries:
107             self._remove(obj, True)
108
109 class Inodes(object):
110     """Manage the set of inodes.  This is the mapping from a numeric id
111     to a concrete File or Directory object"""
112
113     def __init__(self, inode_cache=256*1024*1024):
114         self._entries = {}
115         self._counter = itertools.count(llfuse.ROOT_INODE)
116         self._obj_cache = InodeCache(cap=inode_cache)
117
118     def __getitem__(self, item):
119         return self._entries[item]
120
121     def __setitem__(self, key, item):
122         self._entries[key] = item
123
124     def __iter__(self):
125         return self._entries.iterkeys()
126
127     def items(self):
128         return self._entries.items()
129
130     def __contains__(self, k):
131         return k in self._entries
132
133     def touch(self, entry):
134         entry._atime = time.time()
135         self._obj_cache.touch(entry)
136
137     def cap_cache(self):
138         self._obj_cache.cap_cache()
139
140     def add_entry(self, entry):
141         entry.inode = next(self._counter)
142         self._entries[entry.inode] = entry
143         self._obj_cache.manage(entry)
144         return entry
145
146     def del_entry(self, entry):
147         self._obj_cache.unmanage(entry)
148         llfuse.invalidate_inode(entry.inode)
149         del self._entries[entry.inode]
150
151
152 class Operations(llfuse.Operations):
153     """This is the main interface with llfuse.
154
155     The methods on this object are called by llfuse threads to service FUSE
156     events to query and read from the file system.
157
158     llfuse has its own global lock which is acquired before calling a request handler,
159     so request handlers do not run concurrently unless the lock is explicitly released
160     using 'with llfuse.lock_released:'
161
162     """
163
164     def __init__(self, uid, gid, encoding="utf-8", inode_cache=1000, num_retries=7):
165         super(Operations, self).__init__()
166
167         self.inodes = Inodes(inode_cache)
168         self.uid = uid
169         self.gid = gid
170         self.encoding = encoding
171
172         # dict of inode to filehandle
173         self._filehandles = {}
174         self._filehandles_counter = 1
175
176         # Other threads that need to wait until the fuse driver
177         # is fully initialized should wait() on this event object.
178         self.initlock = threading.Event()
179
180         self.num_retries = num_retries
181
182     def init(self):
183         # Allow threads that are waiting for the driver to be finished
184         # initializing to continue
185         self.initlock.set()
186
187     def access(self, inode, mode, ctx):
188         return True
189
190     def getattr(self, inode):
191         if inode not in self.inodes:
192             raise llfuse.FUSEError(errno.ENOENT)
193
194         e = self.inodes[inode]
195
196         entry = llfuse.EntryAttributes()
197         entry.st_ino = inode
198         entry.generation = 0
199         entry.entry_timeout = 300
200         entry.attr_timeout = 300
201
202         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
203         if isinstance(e, Directory):
204             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
205         else:
206             entry.st_mode |= stat.S_IFREG
207             if isinstance(e, FuseArvadosFile):
208                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
209
210         if e.writable():
211             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
212
213         entry.st_nlink = 1
214         entry.st_uid = self.uid
215         entry.st_gid = self.gid
216         entry.st_rdev = 0
217
218         entry.st_size = e.size()
219
220         entry.st_blksize = 512
221         entry.st_blocks = (e.size()/512)+1
222         entry.st_atime = int(e.atime())
223         entry.st_mtime = int(e.mtime())
224         entry.st_ctime = int(e.mtime())
225
226         return entry
227
228     def lookup(self, parent_inode, name):
229         name = unicode(name, self.encoding)
230         _logger.debug("arv-mount lookup: parent_inode %i name %s",
231                       parent_inode, name)
232         inode = None
233
234         if name == '.':
235             inode = parent_inode
236         else:
237             if parent_inode in self.inodes:
238                 p = self.inodes[parent_inode]
239                 if name == '..':
240                     inode = p.parent_inode
241                 elif isinstance(p, Directory) and name in p:
242                     inode = p[name].inode
243
244         if inode != None:
245             return self.getattr(inode)
246         else:
247             raise llfuse.FUSEError(errno.ENOENT)
248
249     def open(self, inode, flags):
250         if inode in self.inodes:
251             p = self.inodes[inode]
252         else:
253             raise llfuse.FUSEError(errno.ENOENT)
254
255         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
256             raise llfuse.FUSEError(errno.EROFS)
257
258         if isinstance(p, Directory):
259             raise llfuse.FUSEError(errno.EISDIR)
260
261         fh = self._filehandles_counter
262         self._filehandles_counter += 1
263         self._filehandles[fh] = FileHandle(fh, p)
264         self.inodes.touch(p)
265         return fh
266
267     def read(self, fh, off, size):
268         _logger.debug("arv-mount read %i %i %i", fh, off, size)
269         if fh in self._filehandles:
270             handle = self._filehandles[fh]
271         else:
272             raise llfuse.FUSEError(errno.EBADF)
273
274         self.inodes.touch(handle.fileobj)
275
276         try:
277             with llfuse.lock_released:
278                 return handle.fileobj.readfrom(off, size, self.num_retries)
279         except arvados.errors.NotFoundError as e:
280             _logger.warning("Block not found: " + str(e))
281             raise llfuse.FUSEError(errno.EIO)
282         except Exception:
283             _logger.exception("Read error")
284             raise llfuse.FUSEError(errno.EIO)
285
286     def release(self, fh):
287         if fh in self._filehandles:
288             self._filehandles[fh].release()
289             del self._filehandles[fh]
290         self.inodes.cap_cache()
291
292     def releasedir(self, fh):
293         self.release(fh)
294
295     def opendir(self, inode):
296         _logger.debug("arv-mount opendir: inode %i", inode)
297
298         if inode in self.inodes:
299             p = self.inodes[inode]
300         else:
301             raise llfuse.FUSEError(errno.ENOENT)
302
303         if not isinstance(p, Directory):
304             raise llfuse.FUSEError(errno.ENOTDIR)
305
306         fh = self._filehandles_counter
307         self._filehandles_counter += 1
308         if p.parent_inode in self.inodes:
309             parent = self.inodes[p.parent_inode]
310         else:
311             raise llfuse.FUSEError(errno.EIO)
312
313         # update atime
314         self.inodes.touch(p)
315
316         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
317         return fh
318
319
320     def readdir(self, fh, off):
321         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
322
323         if fh in self._filehandles:
324             handle = self._filehandles[fh]
325         else:
326             raise llfuse.FUSEError(errno.EBADF)
327
328         _logger.debug("arv-mount handle.dirobj %s", handle.dirobj)
329
330         e = off
331         while e < len(handle.entries):
332             if handle.entries[e][1].inode in self.inodes:
333                 try:
334                     yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
335                 except UnicodeEncodeError:
336                     pass
337             e += 1
338
339     def statfs(self):
340         st = llfuse.StatvfsData()
341         st.f_bsize = 64 * 1024
342         st.f_blocks = 0
343         st.f_files = 0
344
345         st.f_bfree = 0
346         st.f_bavail = 0
347
348         st.f_ffree = 0
349         st.f_favail = 0
350
351         st.f_frsize = 0
352         return st
353
354     # The llfuse documentation recommends only overloading functions that
355     # are actually implemented, as the default implementation will raise ENOSYS.
356     # However, there is a bug in the llfuse default implementation of create()
357     # "create() takes exactly 5 positional arguments (6 given)" which will crash
358     # arv-mount.
359     # The workaround is to implement it with the proper number of parameters,
360     # and then everything works out.
361     def create(self, inode_parent, name, mode, flags, ctx):
362         raise llfuse.FUSEError(errno.EROFS)