Merge branch 'master' into 3198-writable-fuse
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23 import ciso8601
24 import collections
25 import functools
26
27 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
28 from fusefile import StringFile, FuseArvadosFile
29
30 _logger = logging.getLogger('arvados.arvados_fuse')
31
32 log_handler = logging.StreamHandler()
33 llogger = logging.getLogger('llfuse')
34 llogger.addHandler(log_handler)
35 llogger.setLevel(logging.DEBUG)
36
37 class Handle(object):
38     """Connects a numeric file handle to a File or Directory object that has
39     been opened by the client."""
40
41     def __init__(self, fh, obj):
42         self.fh = fh
43         self.obj = obj
44         self.obj.inc_use()
45
46     def release(self):
47         self.obj.dec_use()
48
49     def flush(self):
50         return self.obj.flush()
51
52
53 class FileHandle(Handle):
54     """Connects a numeric file handle to a File  object that has
55     been opened by the client."""
56     pass
57
58
59 class DirectoryHandle(Handle):
60     """Connects a numeric file handle to a Directory object that has
61     been opened by the client."""
62
63     def __init__(self, fh, dirobj, entries):
64         super(DirectoryHandle, self).__init__(fh, dirobj)
65         self.entries = entries
66
67
68 class InodeCache(object):
69     def __init__(self, cap, min_entries=4):
70         self._entries = collections.OrderedDict()
71         self._by_uuid = {}
72         self._counter = itertools.count(0)
73         self.cap = cap
74         self._total = 0
75         self.min_entries = min_entries
76
77     def total(self):
78         return self._total
79
80     def _remove(self, obj, clear):
81         if clear and not obj.clear():
82             _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
83             return False
84         self._total -= obj.cache_size
85         del self._entries[obj.cache_priority]
86         if obj.cache_uuid:
87             del self._by_uuid[obj.cache_uuid]
88             obj.cache_uuid = None
89         if clear:
90             _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
91         return True
92
93     def cap_cache(self):
94         #_logger.debug("InodeCache total is %i cap is %i", self._total, self.cap)
95         if self._total > self.cap:
96             for key in list(self._entries.keys()):
97                 if self._total < self.cap or len(self._entries) < self.min_entries:
98                     break
99                 self._remove(self._entries[key], True)
100
101     def manage(self, obj):
102         if obj.persisted():
103             obj.cache_priority = next(self._counter)
104             obj.cache_size = obj.objsize()
105             self._entries[obj.cache_priority] = obj
106             obj.cache_uuid = obj.uuid()
107             if obj.cache_uuid:
108                 self._by_uuid[obj.cache_uuid] = obj
109             self._total += obj.objsize()
110             _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
111             self.cap_cache()
112         else:
113             obj.cache_priority = None
114
115     def touch(self, obj):
116         if obj.persisted():
117             if obj.cache_priority in self._entries:
118                 self._remove(obj, False)
119             self.manage(obj)
120
121     def unmanage(self, obj):
122         if obj.persisted() and obj.cache_priority in self._entries:
123             self._remove(obj, True)
124
125     def find(self, uuid):
126         return self._by_uuid.get(uuid)
127
128 class Inodes(object):
129     """Manage the set of inodes.  This is the mapping from a numeric id
130     to a concrete File or Directory object"""
131
132     def __init__(self, inode_cache):
133         self._entries = {}
134         self._counter = itertools.count(llfuse.ROOT_INODE)
135         self.inode_cache = inode_cache
136
137     def __getitem__(self, item):
138         return self._entries[item]
139
140     def __setitem__(self, key, item):
141         self._entries[key] = item
142
143     def __iter__(self):
144         return self._entries.iterkeys()
145
146     def items(self):
147         return self._entries.items()
148
149     def __contains__(self, k):
150         return k in self._entries
151
152     def touch(self, entry):
153         entry._atime = time.time()
154         self.inode_cache.touch(entry)
155
156     def add_entry(self, entry):
157         entry.inode = next(self._counter)
158         if entry.inode == llfuse.ROOT_INODE:
159             entry.inc_ref()
160         self._entries[entry.inode] = entry
161         self.inode_cache.manage(entry)
162         return entry
163
164     def del_entry(self, entry):
165         if entry.ref_count == 0:
166             _logger.debug("Deleting inode %i", entry.inode)
167             self.inode_cache.unmanage(entry)
168             llfuse.invalidate_inode(entry.inode)
169             del self._entries[entry.inode]
170             entry.inode = None
171         else:
172             entry.dead = True
173             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
174
175 def catch_exceptions(orig_func):
176     @functools.wraps(orig_func)
177     def catch_exceptions_wrapper(self, *args, **kwargs):
178         try:
179             return orig_func(self, *args, **kwargs)
180         except llfuse.FUSEError:
181             raise
182         except EnvironmentError as e:
183             raise llfuse.FUSEError(e.errno)
184         except:
185             _logger.exception("Unhandled exception during FUSE operation")
186             raise llfuse.FUSEError(errno.EIO)
187
188     return catch_exceptions_wrapper
189
190
191 class Operations(llfuse.Operations):
192     """This is the main interface with llfuse.
193
194     The methods on this object are called by llfuse threads to service FUSE
195     events to query and read from the file system.
196
197     llfuse has its own global lock which is acquired before calling a request handler,
198     so request handlers do not run concurrently unless the lock is explicitly released
199     using 'with llfuse.lock_released:'
200
201     """
202
203     def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4):
204         super(Operations, self).__init__()
205
206         if not inode_cache:
207             inode_cache = InodeCache(cap=256*1024*1024)
208         self.inodes = Inodes(inode_cache)
209         self.uid = uid
210         self.gid = gid
211         self.encoding = encoding
212
213         # dict of inode to filehandle
214         self._filehandles = {}
215         self._filehandles_counter = itertools.count(0)
216
217         # Other threads that need to wait until the fuse driver
218         # is fully initialized should wait() on this event object.
219         self.initlock = threading.Event()
220
221         self.num_retries = num_retries
222
223         self.events = None
224
225     def init(self):
226         # Allow threads that are waiting for the driver to be finished
227         # initializing to continue
228         self.initlock.set()
229
230     def destroy(self):
231         if self.events:
232             self.events.close()
233
234     def access(self, inode, mode, ctx):
235         return True
236
237     def listen_for_events(self, api_client):
238         self.event = arvados.events.subscribe(api_client,
239                                  [["event_type", "in", ["create", "update", "delete"]]],
240                                  self.on_event)
241
242     def on_event(self, ev):
243         if 'event_type' in ev:
244             with llfuse.lock:
245                 item = self.inodes.inode_cache.find(ev["object_uuid"])
246                 if item is not None:
247                     item.invalidate()
248                     item.update()
249
250                 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
251                 if itemparent is not None:
252                     itemparent.invalidate()
253                     itemparent.update()
254
255     @catch_exceptions
256     def getattr(self, inode):
257         if inode not in self.inodes:
258             raise llfuse.FUSEError(errno.ENOENT)
259
260         e = self.inodes[inode]
261
262         entry = llfuse.EntryAttributes()
263         entry.st_ino = inode
264         entry.generation = 0
265         entry.entry_timeout = 300
266         entry.attr_timeout = 300
267
268         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
269         if isinstance(e, Directory):
270             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
271         else:
272             entry.st_mode |= stat.S_IFREG
273             if isinstance(e, FuseArvadosFile):
274                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
275
276         if e.writable():
277             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
278
279         entry.st_nlink = 1
280         entry.st_uid = self.uid
281         entry.st_gid = self.gid
282         entry.st_rdev = 0
283
284         entry.st_size = e.size()
285
286         entry.st_blksize = 512
287         entry.st_blocks = (e.size()/512)+1
288         entry.st_atime = int(e.atime())
289         entry.st_mtime = int(e.mtime())
290         entry.st_ctime = int(e.mtime())
291
292         return entry
293
294     @catch_exceptions
295     def setattr(self, inode, attr):
296         entry = self.getattr(inode)
297
298         e = self.inodes[inode]
299
300         if attr.st_size is not None and isinstance(e, FuseArvadosFile):
301             with llfuse.lock_released:
302                 e.arvfile.truncate(attr.st_size)
303                 entry.st_size = e.arvfile.size()
304
305         return entry
306
307     @catch_exceptions
308     def lookup(self, parent_inode, name):
309         name = unicode(name, self.encoding)
310         inode = None
311
312         if name == '.':
313             inode = parent_inode
314         else:
315             if parent_inode in self.inodes:
316                 p = self.inodes[parent_inode]
317                 if name == '..':
318                     inode = p.parent_inode
319                 elif isinstance(p, Directory) and name in p:
320                     inode = p[name].inode
321
322         if inode != None:
323             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
324                       parent_inode, name, inode)
325             self.inodes[inode].inc_ref()
326             return self.getattr(inode)
327         else:
328             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
329                       parent_inode, name)
330             raise llfuse.FUSEError(errno.ENOENT)
331
332     @catch_exceptions
333     def forget(self, inodes):
334         for inode, nlookup in inodes:
335             ent = self.inodes[inode]
336             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
337             if ent.dec_ref(nlookup) == 0 and ent.dead:
338                 self.inodes.del_entry(ent)
339
340     @catch_exceptions
341     def open(self, inode, flags):
342         if inode in self.inodes:
343             p = self.inodes[inode]
344         else:
345             raise llfuse.FUSEError(errno.ENOENT)
346
347         if isinstance(p, Directory):
348             raise llfuse.FUSEError(errno.EISDIR)
349
350         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
351             raise llfuse.FUSEError(errno.EPERM)
352
353         fh = next(self._filehandles_counter)
354         self._filehandles[fh] = FileHandle(fh, p)
355         self.inodes.touch(p)
356         return fh
357
358     @catch_exceptions
359     def read(self, fh, off, size):
360         _logger.debug("arv-mount read %i %i %i", fh, off, size)
361         if fh in self._filehandles:
362             handle = self._filehandles[fh]
363         else:
364             raise llfuse.FUSEError(errno.EBADF)
365
366         self.inodes.touch(handle.obj)
367
368         try:
369             with llfuse.lock_released:
370                 return handle.obj.readfrom(off, size, self.num_retries)
371         except arvados.errors.NotFoundError as e:
372             _logger.warning("Block not found: " + str(e))
373             raise llfuse.FUSEError(errno.EIO)
374
375     @catch_exceptions
376     def write(self, fh, off, buf):
377         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
378         if fh in self._filehandles:
379             handle = self._filehandles[fh]
380         else:
381             raise llfuse.FUSEError(errno.EBADF)
382
383         if not handle.obj.writable():
384             raise llfuse.FUSEError(errno.EPERM)
385
386         self.inodes.touch(handle.obj)
387
388         with llfuse.lock_released:
389             return handle.obj.writeto(off, buf, self.num_retries)
390
391     @catch_exceptions
392     def release(self, fh):
393         if fh in self._filehandles:
394             try:
395                 self._filehandles[fh].flush()
396             except EnvironmentError as e:
397                 raise llfuse.FUSEError(e.errno)
398             except Exception:
399                 _logger.exception("Flush error")
400             self._filehandles[fh].release()
401             del self._filehandles[fh]
402         self.inodes.inode_cache.cap_cache()
403
404     def releasedir(self, fh):
405         self.release(fh)
406
407     @catch_exceptions
408     def opendir(self, inode):
409         _logger.debug("arv-mount opendir: inode %i", inode)
410
411         if inode in self.inodes:
412             p = self.inodes[inode]
413         else:
414             raise llfuse.FUSEError(errno.ENOENT)
415
416         if not isinstance(p, Directory):
417             raise llfuse.FUSEError(errno.ENOTDIR)
418
419         fh = next(self._filehandles_counter)
420         if p.parent_inode in self.inodes:
421             parent = self.inodes[p.parent_inode]
422         else:
423             raise llfuse.FUSEError(errno.EIO)
424
425         # update atime
426         self.inodes.touch(p)
427
428         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
429         return fh
430
431     @catch_exceptions
432     def readdir(self, fh, off):
433         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
434
435         if fh in self._filehandles:
436             handle = self._filehandles[fh]
437         else:
438             raise llfuse.FUSEError(errno.EBADF)
439
440         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
441
442         e = off
443         while e < len(handle.entries):
444             if handle.entries[e][1].inode in self.inodes:
445                 try:
446                     yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
447                 except UnicodeEncodeError:
448                     pass
449             e += 1
450
451     @catch_exceptions
452     def statfs(self):
453         st = llfuse.StatvfsData()
454         st.f_bsize = 64 * 1024
455         st.f_blocks = 0
456         st.f_files = 0
457
458         st.f_bfree = 0
459         st.f_bavail = 0
460
461         st.f_ffree = 0
462         st.f_favail = 0
463
464         st.f_frsize = 0
465         return st
466
467     def _check_writable(self, inode_parent):
468         if inode_parent in self.inodes:
469             p = self.inodes[inode_parent]
470         else:
471             raise llfuse.FUSEError(errno.ENOENT)
472
473         if not isinstance(p, Directory):
474             raise llfuse.FUSEError(errno.ENOTDIR)
475
476         if not p.writable():
477             raise llfuse.FUSEError(errno.EPERM)
478
479         return p
480
481     @catch_exceptions
482     def create(self, inode_parent, name, mode, flags, ctx):
483         p = self._check_writable(inode_parent)
484         p.create(name)
485
486         # The file entry should have been implicitly created by callback.
487         f = p[name]
488         fh = next(self._filehandles_counter)
489         self._filehandles[fh] = FileHandle(fh, f)
490         self.inodes.touch(p)
491
492         f.inc_ref()
493         return (fh, self.getattr(f.inode))
494
495     @catch_exceptions
496     def mkdir(self, inode_parent, name, mode, ctx):
497         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
498
499         p = self._check_writable(inode_parent)
500         p.mkdir(name)
501
502         # The dir entry should have been implicitly created by callback.
503         d = p[name]
504
505         d.inc_ref()
506         return self.getattr(d.inode)
507
508     @catch_exceptions
509     def unlink(self, inode_parent, name):
510         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
511         p = self._check_writable(inode_parent)
512         p.unlink(name)
513
514     @catch_exceptions
515     def rmdir(self, inode_parent, name):
516         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
517         p = self._check_writable(inode_parent)
518         p.rmdir(name)
519
520     @catch_exceptions
521     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
522         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
523         src = self._check_writable(inode_parent_old)
524         dest = self._check_writable(inode_parent_new)
525         dest.rename(name_old, name_new, src)
526
527     @catch_exceptions
528     def flush(self, fh):
529         if fh in self._filehandles:
530             self._filehandles[fh].flush()
531
532     def fsync(self, fh, datasync):
533         self.flush(fh)
534
535     def fsyncdir(self, fh, datasync):
536         self.flush(fh)