3198: Support for listening for events to trigger collection updates. TODO: add...
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23 import ciso8601
24 import collections
25 import functools
26
27 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
28 from fusefile import StringFile, FuseArvadosFile
29
30 _logger = logging.getLogger('arvados.arvados_fuse')
31
32 log_handler = logging.StreamHandler()
33 llogger = logging.getLogger('llfuse')
34 llogger.addHandler(log_handler)
35 llogger.setLevel(logging.DEBUG)
36
37 class Handle(object):
38     """Connects a numeric file handle to a File or Directory object that has
39     been opened by the client."""
40
41     def __init__(self, fh, obj):
42         self.fh = fh
43         self.obj = obj
44         self.obj.inc_use()
45
46     def release(self):
47         self.obj.dec_use()
48
49     def flush(self):
50         with llfuse.lock_released:
51             return self.obj.flush()
52
53
54 class FileHandle(Handle):
55     """Connects a numeric file handle to a File  object that has
56     been opened by the client."""
57     pass
58
59
60 class DirectoryHandle(Handle):
61     """Connects a numeric file handle to a Directory object that has
62     been opened by the client."""
63
64     def __init__(self, fh, dirobj, entries):
65         super(DirectoryHandle, self).__init__(fh, dirobj)
66         self.entries = entries
67
68
69 class InodeCache(object):
70     def __init__(self, cap):
71         self._entries = collections.OrderedDict()
72         self._by_uuid = {}
73         self._counter = itertools.count(1)
74         self.cap = cap
75         self._total = 0
76
77     def _remove(self, obj, clear):
78         if clear and not obj.clear():
79             _logger.debug("Could not clear %s in_use %s", obj, obj.in_use())
80             return False
81         self._total -= obj._cache_size
82         del self._entries[obj._cache_priority]
83         if obj._cache_uuid:
84             del self._by_uuid[obj._cache_uuid]
85             obj._cache_uuid = None
86         _logger.debug("Cleared %s total now %i", obj, self._total)
87         return True
88
89     def cap_cache(self):
90         _logger.debug("total is %i cap is %i", self._total, self.cap)
91         if self._total > self.cap:
92             for key in list(self._entries.keys()):
93                 if self._total < self.cap or len(self._entries) < 4:
94                     break
95                 self._remove(self._entries[key], True)
96
97     def manage(self, obj):
98         if obj.persisted():
99             obj._cache_priority = next(self._counter)
100             obj._cache_size = obj.objsize()
101             self._entries[obj._cache_priority] = obj
102             if obj.uuid():
103                 obj._cache_uuid = obj.uuid()
104                 self._by_uuid[obj._cache_uuid] = obj
105             self._total += obj.objsize()
106             _logger.debug("Managing %s total now %i", obj, self._total)
107             self.cap_cache()
108         else:
109             obj._cache_priority = None
110
111     def touch(self, obj):
112         if obj.persisted():
113             if obj._cache_priority in self._entries:
114                 self._remove(obj, False)
115             self.manage(obj)
116             _logger.debug("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
117
118     def unmanage(self, obj):
119         if obj.persisted() and obj._cache_priority in self._entries:
120             self._remove(obj, True)
121
122     def find(self, uuid):
123         return self._by_uuid.get(uuid)
124
125 class Inodes(object):
126     """Manage the set of inodes.  This is the mapping from a numeric id
127     to a concrete File or Directory object"""
128
129     def __init__(self, inode_cache=256*1024*1024):
130         self._entries = {}
131         self._counter = itertools.count(llfuse.ROOT_INODE)
132         self.cache = InodeCache(cap=inode_cache)
133
134     def __getitem__(self, item):
135         return self._entries[item]
136
137     def __setitem__(self, key, item):
138         self._entries[key] = item
139
140     def __iter__(self):
141         return self._entries.iterkeys()
142
143     def items(self):
144         return self._entries.items()
145
146     def __contains__(self, k):
147         return k in self._entries
148
149     def touch(self, entry):
150         entry._atime = time.time()
151         self.cache.touch(entry)
152
153     def add_entry(self, entry):
154         entry.inode = next(self._counter)
155         self._entries[entry.inode] = entry
156         self.cache.manage(entry)
157         return entry
158
159     def del_entry(self, entry):
160         if entry.ref_count == 0:
161             _logger.warn("Deleting inode %i", entry.inode)
162             self.cache.unmanage(entry)
163             llfuse.invalidate_inode(entry.inode)
164             del self._entries[entry.inode]
165         else:
166             _logger.warn("Inode %i has refcount %i", entry.inode, entry.ref_count)
167             entry.dead = True
168
169 def catch_exceptions(orig_func):
170     @functools.wraps(orig_func)
171     def catch_exceptions_wrapper(self, *args, **kwargs):
172         try:
173             return orig_func(self, *args, **kwargs)
174         except llfuse.FUSEError:
175             raise
176         except EnvironmentError as e:
177             raise llfuse.FUSEError(e.errno)
178         except:
179             _logger.exception("Unhandled exception during FUSE operation")
180             raise llfuse.FUSEError(errno.EIO)
181
182     return catch_exceptions_wrapper
183
184
185 class Operations(llfuse.Operations):
186     """This is the main interface with llfuse.
187
188     The methods on this object are called by llfuse threads to service FUSE
189     events to query and read from the file system.
190
191     llfuse has its own global lock which is acquired before calling a request handler,
192     so request handlers do not run concurrently unless the lock is explicitly released
193     using 'with llfuse.lock_released:'
194
195     """
196
197     def __init__(self, uid, gid, encoding="utf-8", inode_cache=1000, num_retries=7):
198         super(Operations, self).__init__()
199
200         self.inodes = Inodes(inode_cache)
201         self.uid = uid
202         self.gid = gid
203         self.encoding = encoding
204
205         # dict of inode to filehandle
206         self._filehandles = {}
207         self._filehandles_counter = 1
208
209         # Other threads that need to wait until the fuse driver
210         # is fully initialized should wait() on this event object.
211         self.initlock = threading.Event()
212
213         self.num_retries = num_retries
214
215         self.events = None
216
217     def init(self):
218         # Allow threads that are waiting for the driver to be finished
219         # initializing to continue
220         self.initlock.set()
221
222     def destroy(self):
223         if self.events:
224             self.events.close()
225
226     def access(self, inode, mode, ctx):
227         return True
228
229     def listen_for_events(self, api_client):
230         self.event = arvados.events.subscribe(api_client,
231                                  [["event_type", "in", ["create", "update", "delete"]]],
232                                  self.on_event)
233
234     def on_event(self, ev):
235         if 'event_type' in ev:
236             with llfuse.lock:
237                 item = self.inodes.cache.find(ev["object_uuid"])
238                 if item:
239                     item.invalidate()
240                     item.update()
241
242                 itemparent = self.inodes.cache.find(ev["object_owner_uuid"])
243                 if itemparent:
244                     itemparent.invalidate()
245                     itemparent.update()
246
247     @catch_exceptions
248     def getattr(self, inode):
249         if inode not in self.inodes:
250             raise llfuse.FUSEError(errno.ENOENT)
251
252         e = self.inodes[inode]
253
254         entry = llfuse.EntryAttributes()
255         entry.st_ino = inode
256         entry.generation = 0
257         entry.entry_timeout = 300
258         entry.attr_timeout = 300
259
260         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
261         if isinstance(e, Directory):
262             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
263         else:
264             entry.st_mode |= stat.S_IFREG
265             if isinstance(e, FuseArvadosFile):
266                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
267
268         if e.writable():
269             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
270
271         entry.st_nlink = 1
272         entry.st_uid = self.uid
273         entry.st_gid = self.gid
274         entry.st_rdev = 0
275
276         entry.st_size = e.size()
277
278         entry.st_blksize = 512
279         entry.st_blocks = (e.size()/512)+1
280         entry.st_atime = int(e.atime())
281         entry.st_mtime = int(e.mtime())
282         entry.st_ctime = int(e.mtime())
283
284         return entry
285
286     @catch_exceptions
287     def lookup(self, parent_inode, name):
288         name = unicode(name, self.encoding)
289         inode = None
290
291         if name == '.':
292             inode = parent_inode
293         else:
294             if parent_inode in self.inodes:
295                 p = self.inodes[parent_inode]
296                 if name == '..':
297                     inode = p.parent_inode
298                 elif isinstance(p, Directory) and name in p:
299                     inode = p[name].inode
300
301         if inode != None:
302             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
303                       parent_inode, name, inode)
304             self.inodes[inode].inc_ref()
305             return self.getattr(inode)
306         else:
307             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
308                       parent_inode, name)
309             raise llfuse.FUSEError(errno.ENOENT)
310
311     @catch_exceptions
312     def forget(self, inodes):
313         for inode, nlookup in inodes:
314             _logger.debug("arv-mount forget: %i %i", inode, nlookup)
315             ent = self.inodes[inode]
316             if ent.dec_ref(nlookup) == 0 and ent.dead:
317                 self.inodes.del_entry(ent)
318
319     @catch_exceptions
320     def open(self, inode, flags):
321         if inode in self.inodes:
322             p = self.inodes[inode]
323         else:
324             raise llfuse.FUSEError(errno.ENOENT)
325
326         if isinstance(p, Directory):
327             raise llfuse.FUSEError(errno.EISDIR)
328
329         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
330             raise llfuse.FUSEError(errno.EPERM)
331
332         fh = self._filehandles_counter
333         self._filehandles_counter += 1
334         self._filehandles[fh] = FileHandle(fh, p)
335         self.inodes.touch(p)
336         return fh
337
338     @catch_exceptions
339     def read(self, fh, off, size):
340         _logger.debug("arv-mount read %i %i %i", fh, off, size)
341         if fh in self._filehandles:
342             handle = self._filehandles[fh]
343         else:
344             raise llfuse.FUSEError(errno.EBADF)
345
346         self.inodes.touch(handle.obj)
347
348         try:
349             with llfuse.lock_released:
350                 return handle.obj.readfrom(off, size, self.num_retries)
351         except arvados.errors.NotFoundError as e:
352             _logger.warning("Block not found: " + str(e))
353             raise llfuse.FUSEError(errno.EIO)
354
355     @catch_exceptions
356     def write(self, fh, off, buf):
357         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
358         if fh in self._filehandles:
359             handle = self._filehandles[fh]
360         else:
361             raise llfuse.FUSEError(errno.EBADF)
362
363         if not handle.obj.writable():
364             raise llfuse.FUSEError(errno.EPERM)
365
366         self.inodes.touch(handle.obj)
367
368         with llfuse.lock_released:
369             return handle.obj.writeto(off, buf, self.num_retries)
370
371     @catch_exceptions
372     def release(self, fh):
373         if fh in self._filehandles:
374             try:
375                 self._filehandles[fh].flush()
376             except EnvironmentError as e:
377                 raise llfuse.FUSEError(e.errno)
378             except Exception:
379                 _logger.exception("Flush error")
380             self._filehandles[fh].release()
381             del self._filehandles[fh]
382         self.inodes.cache.cap_cache()
383
384     def releasedir(self, fh):
385         self.release(fh)
386
387     @catch_exceptions
388     def opendir(self, inode):
389         _logger.debug("arv-mount opendir: inode %i", inode)
390
391         if inode in self.inodes:
392             p = self.inodes[inode]
393         else:
394             raise llfuse.FUSEError(errno.ENOENT)
395
396         if not isinstance(p, Directory):
397             raise llfuse.FUSEError(errno.ENOTDIR)
398
399         fh = self._filehandles_counter
400         self._filehandles_counter += 1
401         if p.parent_inode in self.inodes:
402             parent = self.inodes[p.parent_inode]
403         else:
404             raise llfuse.FUSEError(errno.EIO)
405
406         # update atime
407         self.inodes.touch(p)
408
409         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
410         return fh
411
412     @catch_exceptions
413     def readdir(self, fh, off):
414         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
415
416         if fh in self._filehandles:
417             handle = self._filehandles[fh]
418         else:
419             raise llfuse.FUSEError(errno.EBADF)
420
421         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
422
423         e = off
424         while e < len(handle.entries):
425             if handle.entries[e][1].inode in self.inodes:
426                 try:
427                     yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
428                 except UnicodeEncodeError:
429                     pass
430             e += 1
431
432     @catch_exceptions
433     def statfs(self):
434         st = llfuse.StatvfsData()
435         st.f_bsize = 64 * 1024
436         st.f_blocks = 0
437         st.f_files = 0
438
439         st.f_bfree = 0
440         st.f_bavail = 0
441
442         st.f_ffree = 0
443         st.f_favail = 0
444
445         st.f_frsize = 0
446         return st
447
448     def _check_writable(self, inode_parent):
449         if inode_parent in self.inodes:
450             p = self.inodes[inode_parent]
451         else:
452             raise llfuse.FUSEError(errno.ENOENT)
453
454         if not isinstance(p, Directory):
455             raise llfuse.FUSEError(errno.ENOTDIR)
456
457         if not p.writable():
458             raise llfuse.FUSEError(errno.EPERM)
459
460         if not isinstance(p, CollectionDirectoryBase):
461             raise llfuse.FUSEError(errno.EPERM)
462
463         return p
464
465     @catch_exceptions
466     def create(self, inode_parent, name, mode, flags, ctx):
467         p = self._check_writable(inode_parent)
468
469         with llfuse.lock_released:
470             p.collection.open(name, "w")
471
472         # The file entry should have been implicitly created by callback.
473         f = p[name]
474         fh = self._filehandles_counter
475         self._filehandles_counter += 1
476         self._filehandles[fh] = FileHandle(fh, f)
477         self.inodes.touch(p)
478
479         f.inc_ref()
480         return (fh, self.getattr(f.inode))
481
482     @catch_exceptions
483     def mkdir(self, inode_parent, name, mode, ctx):
484         p = self._check_writable(inode_parent)
485
486         with llfuse.lock_released:
487             p.collection.mkdirs(name)
488
489         # The dir entry should have been implicitly created by callback.
490         d = p[name]
491
492         d.inc_ref()
493         return self.getattr(d.inode)
494
495     @catch_exceptions
496     def unlink(self, inode_parent, name):
497         p = self._check_writable(inode_parent)
498
499         with llfuse.lock_released:
500             p.collection.remove(name)
501
502     def rmdir(self, inode_parent, name):
503         self.unlink(inode_parent, name)
504
505     @catch_exceptions
506     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
507         src = self._check_writable(inode_parent_old)
508         dest = self._check_writable(inode_parent_new)
509
510         with llfuse.lock_released:
511             dest.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
512             dest.flush()
513             src.flush()
514
515     @catch_exceptions
516     def flush(self, fh):
517         if fh in self._filehandles:
518             self._filehandles[fh].flush()
519
520     def fsync(self, fh, datasync):
521         self.flush(fh)
522
523     def fsyncdir(self, fh, datasync):
524         self.flush(fh)