3198: writeto() notifies with WRITE instead of MOD. WRITE doesn't invalidate the...
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23 import ciso8601
24 import collections
25 import functools
26
27 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
28 from fusefile import StringFile, FuseArvadosFile
29
30 _logger = logging.getLogger('arvados.arvados_fuse')
31
32 log_handler = logging.StreamHandler()
33 llogger = logging.getLogger('llfuse')
34 llogger.addHandler(log_handler)
35 llogger.setLevel(logging.DEBUG)
36
37 class Handle(object):
38     """Connects a numeric file handle to a File or Directory object that has
39     been opened by the client."""
40
41     def __init__(self, fh, obj):
42         self.fh = fh
43         self.obj = obj
44         self.obj.inc_use()
45
46     def release(self):
47         self.obj.dec_use()
48
49     def flush(self):
50         return self.obj.flush()
51
52
53 class FileHandle(Handle):
54     """Connects a numeric file handle to a File  object that has
55     been opened by the client."""
56     pass
57
58
59 class DirectoryHandle(Handle):
60     """Connects a numeric file handle to a Directory object that has
61     been opened by the client."""
62
63     def __init__(self, fh, dirobj, entries):
64         super(DirectoryHandle, self).__init__(fh, dirobj)
65         self.entries = entries
66
67
68 class InodeCache(object):
69     def __init__(self, cap, min_entries=4):
70         self._entries = collections.OrderedDict()
71         self._by_uuid = {}
72         self._counter = itertools.count(0)
73         self.cap = cap
74         self._total = 0
75         self.min_entries = min_entries
76
77     def total(self):
78         return self._total
79
80     def _remove(self, obj, clear):
81         if clear and not obj.clear():
82             _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
83             return False
84         self._total -= obj.cache_size
85         del self._entries[obj.cache_priority]
86         if obj.cache_uuid:
87             del self._by_uuid[obj.cache_uuid]
88             obj.cache_uuid = None
89         if clear:
90             _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
91         return True
92
93     def cap_cache(self):
94         #_logger.debug("InodeCache total is %i cap is %i", self._total, self.cap)
95         if self._total > self.cap:
96             for key in list(self._entries.keys()):
97                 if self._total < self.cap or len(self._entries) < self.min_entries:
98                     break
99                 self._remove(self._entries[key], True)
100
101     def manage(self, obj):
102         if obj.persisted():
103             obj.cache_priority = next(self._counter)
104             obj.cache_size = obj.objsize()
105             self._entries[obj.cache_priority] = obj
106             obj.cache_uuid = obj.uuid()
107             if obj.cache_uuid:
108                 self._by_uuid[obj.cache_uuid] = obj
109             self._total += obj.objsize()
110             _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
111             self.cap_cache()
112         else:
113             obj.cache_priority = None
114
115     def touch(self, obj):
116         if obj.persisted():
117             if obj.cache_priority in self._entries:
118                 self._remove(obj, False)
119             self.manage(obj)
120
121     def unmanage(self, obj):
122         if obj.persisted() and obj.cache_priority in self._entries:
123             self._remove(obj, True)
124
125     def find(self, uuid):
126         return self._by_uuid.get(uuid)
127
128 class Inodes(object):
129     """Manage the set of inodes.  This is the mapping from a numeric id
130     to a concrete File or Directory object"""
131
132     def __init__(self, inode_cache):
133         self._entries = {}
134         self._counter = itertools.count(llfuse.ROOT_INODE)
135         self.inode_cache = inode_cache
136
137     def __getitem__(self, item):
138         return self._entries[item]
139
140     def __setitem__(self, key, item):
141         self._entries[key] = item
142
143     def __iter__(self):
144         return self._entries.iterkeys()
145
146     def items(self):
147         return self._entries.items()
148
149     def __contains__(self, k):
150         return k in self._entries
151
152     def touch(self, entry):
153         entry._atime = time.time()
154         self.inode_cache.touch(entry)
155
156     def add_entry(self, entry):
157         entry.inode = next(self._counter)
158         if entry.inode == llfuse.ROOT_INODE:
159             entry.inc_ref()
160         self._entries[entry.inode] = entry
161         self.inode_cache.manage(entry)
162         return entry
163
164     def del_entry(self, entry):
165         if entry.ref_count == 0:
166             _logger.debug("Deleting inode %i", entry.inode)
167             self.inode_cache.unmanage(entry)
168             llfuse.invalidate_inode(entry.inode)
169             del self._entries[entry.inode]
170             entry.inode = None
171         else:
172             entry.dead = True
173             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
174
175 def catch_exceptions(orig_func):
176     @functools.wraps(orig_func)
177     def catch_exceptions_wrapper(self, *args, **kwargs):
178         try:
179             return orig_func(self, *args, **kwargs)
180         except llfuse.FUSEError:
181             raise
182         except EnvironmentError as e:
183             raise llfuse.FUSEError(e.errno)
184         except:
185             _logger.exception("Unhandled exception during FUSE operation")
186             raise llfuse.FUSEError(errno.EIO)
187
188     return catch_exceptions_wrapper
189
190
191 class Operations(llfuse.Operations):
192     """This is the main interface with llfuse.
193
194     The methods on this object are called by llfuse threads to service FUSE
195     events to query and read from the file system.
196
197     llfuse has its own global lock which is acquired before calling a request handler,
198     so request handlers do not run concurrently unless the lock is explicitly released
199     using 'with llfuse.lock_released:'
200
201     """
202
203     def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4):
204         super(Operations, self).__init__()
205
206         if not inode_cache:
207             inode_cache = InodeCache(cap=256*1024*1024)
208         self.inodes = Inodes(inode_cache)
209         self.uid = uid
210         self.gid = gid
211         self.encoding = encoding
212
213         # dict of inode to filehandle
214         self._filehandles = {}
215         self._filehandles_counter = itertools.count(0)
216
217         # Other threads that need to wait until the fuse driver
218         # is fully initialized should wait() on this event object.
219         self.initlock = threading.Event()
220
221         self.num_retries = num_retries
222
223         self.events = None
224
225     def init(self):
226         # Allow threads that are waiting for the driver to be finished
227         # initializing to continue
228         self.initlock.set()
229
230     def destroy(self):
231         if self.events:
232             self.events.close()
233
234     def access(self, inode, mode, ctx):
235         return True
236
237     def listen_for_events(self, api_client):
238         self.event = arvados.events.subscribe(api_client,
239                                  [["event_type", "in", ["create", "update", "delete"]]],
240                                  self.on_event)
241
242     def on_event(self, ev):
243         if 'event_type' in ev:
244             with llfuse.lock:
245                 item = self.inodes.inode_cache.find(ev["object_uuid"])
246                 if item is not None:
247                     item.invalidate()
248                     item.update()
249
250                 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
251                 if itemparent is not None:
252                     itemparent.invalidate()
253                     itemparent.update()
254
255     @catch_exceptions
256     def getattr(self, inode):
257         if inode not in self.inodes:
258             raise llfuse.FUSEError(errno.ENOENT)
259
260         e = self.inodes[inode]
261
262         entry = llfuse.EntryAttributes()
263         entry.st_ino = inode
264         entry.generation = 0
265         entry.entry_timeout = 300
266         entry.attr_timeout = 300
267
268         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
269         if isinstance(e, Directory):
270             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
271         else:
272             entry.st_mode |= stat.S_IFREG
273             if isinstance(e, FuseArvadosFile):
274                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
275
276         if e.writable():
277             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
278
279         entry.st_nlink = 1
280         entry.st_uid = self.uid
281         entry.st_gid = self.gid
282         entry.st_rdev = 0
283
284         entry.st_size = e.size()
285
286         entry.st_blksize = 512
287         entry.st_blocks = (e.size()/512)+1
288         entry.st_atime = int(e.atime())
289         entry.st_mtime = int(e.mtime())
290         entry.st_ctime = int(e.mtime())
291
292         return entry
293
294     @catch_exceptions
295     def setattr(self, inode, attr):
296         entry = self.getattr(inode)
297
298         e = self.inodes[inode]
299
300         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
301             with llfuse.lock_released:
302                 e.arvfile.truncate(attr.st_size)
303                 entry.st_size = e.arvfile.size()
304
305         return entry
306
307     @catch_exceptions
308     def lookup(self, parent_inode, name):
309         name = unicode(name, self.encoding)
310         inode = None
311
312         if name == '.':
313             inode = parent_inode
314         else:
315             if parent_inode in self.inodes:
316                 p = self.inodes[parent_inode]
317                 if name == '..':
318                     inode = p.parent_inode
319                 elif isinstance(p, Directory) and name in p:
320                     inode = p[name].inode
321
322         if inode != None:
323             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
324                       parent_inode, name, inode)
325             self.inodes[inode].inc_ref()
326             return self.getattr(inode)
327         else:
328             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
329                       parent_inode, name)
330             raise llfuse.FUSEError(errno.ENOENT)
331
332     @catch_exceptions
333     def forget(self, inodes):
334         for inode, nlookup in inodes:
335             ent = self.inodes[inode]
336             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
337             if ent.dec_ref(nlookup) == 0 and ent.dead:
338                 self.inodes.del_entry(ent)
339
340     @catch_exceptions
341     def open(self, inode, flags):
342         if inode in self.inodes:
343             p = self.inodes[inode]
344         else:
345             raise llfuse.FUSEError(errno.ENOENT)
346
347         if isinstance(p, Directory):
348             raise llfuse.FUSEError(errno.EISDIR)
349
350         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
351             raise llfuse.FUSEError(errno.EPERM)
352
353         fh = next(self._filehandles_counter)
354         self._filehandles[fh] = FileHandle(fh, p)
355         self.inodes.touch(p)
356         return fh
357
358     @catch_exceptions
359     def read(self, fh, off, size):
360         _logger.debug("arv-mount read %i %i %i", fh, off, size)
361         if fh in self._filehandles:
362             handle = self._filehandles[fh]
363         else:
364             raise llfuse.FUSEError(errno.EBADF)
365
366         self.inodes.touch(handle.obj)
367
368         try:
369             return handle.obj.readfrom(off, size, self.num_retries)
370         except arvados.errors.NotFoundError as e:
371             _logger.error("Block not found: " + str(e))
372             raise llfuse.FUSEError(errno.EIO)
373
374     @catch_exceptions
375     def write(self, fh, off, buf):
376         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
377         if fh in self._filehandles:
378             handle = self._filehandles[fh]
379         else:
380             raise llfuse.FUSEError(errno.EBADF)
381
382         if not handle.obj.writable():
383             raise llfuse.FUSEError(errno.EPERM)
384
385         self.inodes.touch(handle.obj)
386
387         return handle.obj.writeto(off, buf, self.num_retries)
388
389     @catch_exceptions
390     def release(self, fh):
391         if fh in self._filehandles:
392             try:
393                 self._filehandles[fh].flush()
394             except EnvironmentError as e:
395                 raise llfuse.FUSEError(e.errno)
396             except Exception:
397                 _logger.exception("Flush error")
398             self._filehandles[fh].release()
399             del self._filehandles[fh]
400         self.inodes.inode_cache.cap_cache()
401
402     def releasedir(self, fh):
403         self.release(fh)
404
405     @catch_exceptions
406     def opendir(self, inode):
407         _logger.debug("arv-mount opendir: inode %i", inode)
408
409         if inode in self.inodes:
410             p = self.inodes[inode]
411         else:
412             raise llfuse.FUSEError(errno.ENOENT)
413
414         if not isinstance(p, Directory):
415             raise llfuse.FUSEError(errno.ENOTDIR)
416
417         fh = next(self._filehandles_counter)
418         if p.parent_inode in self.inodes:
419             parent = self.inodes[p.parent_inode]
420         else:
421             raise llfuse.FUSEError(errno.EIO)
422
423         # update atime
424         self.inodes.touch(p)
425
426         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
427         return fh
428
429     @catch_exceptions
430     def readdir(self, fh, off):
431         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
432
433         if fh in self._filehandles:
434             handle = self._filehandles[fh]
435         else:
436             raise llfuse.FUSEError(errno.EBADF)
437
438         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
439
440         e = off
441         while e < len(handle.entries):
442             if handle.entries[e][1].inode in self.inodes:
443                 try:
444                     yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
445                 except UnicodeEncodeError:
446                     pass
447             e += 1
448
449     @catch_exceptions
450     def statfs(self):
451         st = llfuse.StatvfsData()
452         st.f_bsize = 64 * 1024
453         st.f_blocks = 0
454         st.f_files = 0
455
456         st.f_bfree = 0
457         st.f_bavail = 0
458
459         st.f_ffree = 0
460         st.f_favail = 0
461
462         st.f_frsize = 0
463         return st
464
465     def _check_writable(self, inode_parent):
466         if inode_parent in self.inodes:
467             p = self.inodes[inode_parent]
468         else:
469             raise llfuse.FUSEError(errno.ENOENT)
470
471         if not isinstance(p, Directory):
472             raise llfuse.FUSEError(errno.ENOTDIR)
473
474         if not p.writable():
475             raise llfuse.FUSEError(errno.EPERM)
476
477         return p
478
479     @catch_exceptions
480     def create(self, inode_parent, name, mode, flags, ctx):
481         p = self._check_writable(inode_parent)
482         p.create(name)
483
484         # The file entry should have been implicitly created by callback.
485         f = p[name]
486         fh = next(self._filehandles_counter)
487         self._filehandles[fh] = FileHandle(fh, f)
488         self.inodes.touch(p)
489
490         f.inc_ref()
491         return (fh, self.getattr(f.inode))
492
493     @catch_exceptions
494     def mkdir(self, inode_parent, name, mode, ctx):
495         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
496
497         p = self._check_writable(inode_parent)
498         p.mkdir(name)
499
500         # The dir entry should have been implicitly created by callback.
501         d = p[name]
502
503         d.inc_ref()
504         return self.getattr(d.inode)
505
506     @catch_exceptions
507     def unlink(self, inode_parent, name):
508         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
509         p = self._check_writable(inode_parent)
510         p.unlink(name)
511
512     @catch_exceptions
513     def rmdir(self, inode_parent, name):
514         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
515         p = self._check_writable(inode_parent)
516         p.rmdir(name)
517
518     @catch_exceptions
519     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
520         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
521         src = self._check_writable(inode_parent_old)
522         dest = self._check_writable(inode_parent_new)
523         dest.rename(name_old, name_new, src)
524
525     @catch_exceptions
526     def flush(self, fh):
527         if fh in self._filehandles:
528             self._filehandles[fh].flush()
529
530     def fsync(self, fh, datasync):
531         self.flush(fh)
532
533     def fsyncdir(self, fh, datasync):
534         self.flush(fh)