3198: Renaming collections in projects works. Improved conformance to POSIX semantic...
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23 import ciso8601
24 import collections
25 import functools
26
27 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
28 from fusefile import StringFile, FuseArvadosFile
29
30 _logger = logging.getLogger('arvados.arvados_fuse')
31
32 log_handler = logging.StreamHandler()
33 llogger = logging.getLogger('llfuse')
34 llogger.addHandler(log_handler)
35 llogger.setLevel(logging.DEBUG)
36
37 class Handle(object):
38     """Connects a numeric file handle to a File or Directory object that has
39     been opened by the client."""
40
41     def __init__(self, fh, obj):
42         self.fh = fh
43         self.obj = obj
44         self.obj.inc_use()
45
46     def release(self):
47         self.obj.dec_use()
48
49     def flush(self):
50         return self.obj.flush()
51
52
53 class FileHandle(Handle):
54     """Connects a numeric file handle to a File  object that has
55     been opened by the client."""
56     pass
57
58
59 class DirectoryHandle(Handle):
60     """Connects a numeric file handle to a Directory object that has
61     been opened by the client."""
62
63     def __init__(self, fh, dirobj, entries):
64         super(DirectoryHandle, self).__init__(fh, dirobj)
65         self.entries = entries
66
67
68 class InodeCache(object):
69     def __init__(self, cap, min_entries=4):
70         self._entries = collections.OrderedDict()
71         self._by_uuid = {}
72         self._counter = itertools.count(0)
73         self.cap = cap
74         self._total = 0
75         self.min_entries = min_entries
76
77     def total(self):
78         return self._total
79
80     def _remove(self, obj, clear):
81         if clear and not obj.clear():
82             _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
83             return False
84         self._total -= obj.cache_size
85         del self._entries[obj.cache_priority]
86         if obj.cache_uuid:
87             del self._by_uuid[obj.cache_uuid]
88             obj.cache_uuid = None
89         if clear:
90             _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
91         return True
92
93     def cap_cache(self):
94         #_logger.debug("InodeCache total is %i cap is %i", self._total, self.cap)
95         if self._total > self.cap:
96             for key in list(self._entries.keys()):
97                 if self._total < self.cap or len(self._entries) < self.min_entries:
98                     break
99                 self._remove(self._entries[key], True)
100
101     def manage(self, obj):
102         if obj.persisted():
103             obj.cache_priority = next(self._counter)
104             obj.cache_size = obj.objsize()
105             self._entries[obj.cache_priority] = obj
106             obj.cache_uuid = obj.uuid()
107             if obj.cache_uuid:
108                 self._by_uuid[obj.cache_uuid] = obj
109             self._total += obj.objsize()
110             _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
111             self.cap_cache()
112         else:
113             obj.cache_priority = None
114
115     def touch(self, obj):
116         if obj.persisted():
117             if obj.cache_priority in self._entries:
118                 self._remove(obj, False)
119             self.manage(obj)
120
121     def unmanage(self, obj):
122         if obj.persisted() and obj.cache_priority in self._entries:
123             self._remove(obj, True)
124
125     def find(self, uuid):
126         return self._by_uuid.get(uuid)
127
128 class Inodes(object):
129     """Manage the set of inodes.  This is the mapping from a numeric id
130     to a concrete File or Directory object"""
131
132     def __init__(self, inode_cache):
133         self._entries = {}
134         self._counter = itertools.count(llfuse.ROOT_INODE)
135         self.inode_cache = inode_cache
136
137     def __getitem__(self, item):
138         return self._entries[item]
139
140     def __setitem__(self, key, item):
141         self._entries[key] = item
142
143     def __iter__(self):
144         return self._entries.iterkeys()
145
146     def items(self):
147         return self._entries.items()
148
149     def __contains__(self, k):
150         return k in self._entries
151
152     def touch(self, entry):
153         entry._atime = time.time()
154         self.inode_cache.touch(entry)
155
156     def add_entry(self, entry):
157         entry.inode = next(self._counter)
158         if entry.inode == llfuse.ROOT_INODE:
159             entry.inc_ref()
160         self._entries[entry.inode] = entry
161         self.inode_cache.manage(entry)
162         return entry
163
164     def del_entry(self, entry):
165         if entry.ref_count == 0:
166             _logger.debug("Deleting inode %i", entry.inode)
167             self.inode_cache.unmanage(entry)
168             llfuse.invalidate_inode(entry.inode)
169             del self._entries[entry.inode]
170             entry.inode = None
171         else:
172             entry.dead = True
173             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
174
175 def catch_exceptions(orig_func):
176     @functools.wraps(orig_func)
177     def catch_exceptions_wrapper(self, *args, **kwargs):
178         try:
179             return orig_func(self, *args, **kwargs)
180         except llfuse.FUSEError:
181             raise
182         except EnvironmentError as e:
183             raise llfuse.FUSEError(e.errno)
184         except:
185             _logger.exception("Unhandled exception during FUSE operation")
186             raise llfuse.FUSEError(errno.EIO)
187
188     return catch_exceptions_wrapper
189
190
191 class Operations(llfuse.Operations):
192     """This is the main interface with llfuse.
193
194     The methods on this object are called by llfuse threads to service FUSE
195     events to query and read from the file system.
196
197     llfuse has its own global lock which is acquired before calling a request handler,
198     so request handlers do not run concurrently unless the lock is explicitly released
199     using 'with llfuse.lock_released:'
200
201     """
202
203     def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4):
204         super(Operations, self).__init__()
205
206         if not inode_cache:
207             inode_cache = InodeCache(cap=256*1024*1024)
208         self.inodes = Inodes(inode_cache)
209         self.uid = uid
210         self.gid = gid
211         self.encoding = encoding
212
213         # dict of inode to filehandle
214         self._filehandles = {}
215         self._filehandles_counter = itertools.count(0)
216
217         # Other threads that need to wait until the fuse driver
218         # is fully initialized should wait() on this event object.
219         self.initlock = threading.Event()
220
221         self.num_retries = num_retries
222
223         self.events = None
224
225     def init(self):
226         # Allow threads that are waiting for the driver to be finished
227         # initializing to continue
228         self.initlock.set()
229
230     def destroy(self):
231         if self.events:
232             self.events.close()
233
234     def access(self, inode, mode, ctx):
235         return True
236
237     def listen_for_events(self, api_client):
238         self.event = arvados.events.subscribe(api_client,
239                                  [["event_type", "in", ["create", "update", "delete"]]],
240                                  self.on_event)
241
242     def on_event(self, ev):
243         if 'event_type' in ev:
244             with llfuse.lock:
245                 item = self.inodes.inode_cache.find(ev["object_uuid"])
246                 if item is not None:
247                     item.invalidate()
248                     item.update()
249
250                 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
251                 if itemparent is not None:
252                     itemparent.invalidate()
253                     itemparent.update()
254
255     @catch_exceptions
256     def getattr(self, inode):
257         if inode not in self.inodes:
258             raise llfuse.FUSEError(errno.ENOENT)
259
260         e = self.inodes[inode]
261
262         entry = llfuse.EntryAttributes()
263         entry.st_ino = inode
264         entry.generation = 0
265         entry.entry_timeout = 300
266         entry.attr_timeout = 300
267
268         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
269         if isinstance(e, Directory):
270             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
271         else:
272             entry.st_mode |= stat.S_IFREG
273             if isinstance(e, FuseArvadosFile):
274                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
275
276         if e.writable():
277             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
278
279         entry.st_nlink = 1
280         entry.st_uid = self.uid
281         entry.st_gid = self.gid
282         entry.st_rdev = 0
283
284         entry.st_size = e.size()
285
286         entry.st_blksize = 512
287         entry.st_blocks = (e.size()/512)+1
288         entry.st_atime = int(e.atime())
289         entry.st_mtime = int(e.mtime())
290         entry.st_ctime = int(e.mtime())
291
292         return entry
293
294     @catch_exceptions
295     def lookup(self, parent_inode, name):
296         name = unicode(name, self.encoding)
297         inode = None
298
299         if name == '.':
300             inode = parent_inode
301         else:
302             if parent_inode in self.inodes:
303                 p = self.inodes[parent_inode]
304                 if name == '..':
305                     inode = p.parent_inode
306                 elif isinstance(p, Directory) and name in p:
307                     inode = p[name].inode
308
309         if inode != None:
310             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
311                       parent_inode, name, inode)
312             self.inodes[inode].inc_ref()
313             return self.getattr(inode)
314         else:
315             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
316                       parent_inode, name)
317             raise llfuse.FUSEError(errno.ENOENT)
318
319     @catch_exceptions
320     def forget(self, inodes):
321         for inode, nlookup in inodes:
322             ent = self.inodes[inode]
323             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
324             if ent.dec_ref(nlookup) == 0 and ent.dead:
325                 self.inodes.del_entry(ent)
326
327     @catch_exceptions
328     def open(self, inode, flags):
329         if inode in self.inodes:
330             p = self.inodes[inode]
331         else:
332             raise llfuse.FUSEError(errno.ENOENT)
333
334         if isinstance(p, Directory):
335             raise llfuse.FUSEError(errno.EISDIR)
336
337         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
338             raise llfuse.FUSEError(errno.EPERM)
339
340         fh = next(self._filehandles_counter)
341         self._filehandles[fh] = FileHandle(fh, p)
342         self.inodes.touch(p)
343         return fh
344
345     @catch_exceptions
346     def read(self, fh, off, size):
347         _logger.debug("arv-mount read %i %i %i", fh, off, size)
348         if fh in self._filehandles:
349             handle = self._filehandles[fh]
350         else:
351             raise llfuse.FUSEError(errno.EBADF)
352
353         self.inodes.touch(handle.obj)
354
355         try:
356             with llfuse.lock_released:
357                 return handle.obj.readfrom(off, size, self.num_retries)
358         except arvados.errors.NotFoundError as e:
359             _logger.warning("Block not found: " + str(e))
360             raise llfuse.FUSEError(errno.EIO)
361
362     @catch_exceptions
363     def write(self, fh, off, buf):
364         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
365         if fh in self._filehandles:
366             handle = self._filehandles[fh]
367         else:
368             raise llfuse.FUSEError(errno.EBADF)
369
370         if not handle.obj.writable():
371             raise llfuse.FUSEError(errno.EPERM)
372
373         self.inodes.touch(handle.obj)
374
375         with llfuse.lock_released:
376             return handle.obj.writeto(off, buf, self.num_retries)
377
378     @catch_exceptions
379     def release(self, fh):
380         if fh in self._filehandles:
381             try:
382                 self._filehandles[fh].flush()
383             except EnvironmentError as e:
384                 raise llfuse.FUSEError(e.errno)
385             except Exception:
386                 _logger.exception("Flush error")
387             self._filehandles[fh].release()
388             del self._filehandles[fh]
389         self.inodes.inode_cache.cap_cache()
390
391     def releasedir(self, fh):
392         self.release(fh)
393
394     @catch_exceptions
395     def opendir(self, inode):
396         _logger.debug("arv-mount opendir: inode %i", inode)
397
398         if inode in self.inodes:
399             p = self.inodes[inode]
400         else:
401             raise llfuse.FUSEError(errno.ENOENT)
402
403         if not isinstance(p, Directory):
404             raise llfuse.FUSEError(errno.ENOTDIR)
405
406         fh = next(self._filehandles_counter)
407         if p.parent_inode in self.inodes:
408             parent = self.inodes[p.parent_inode]
409         else:
410             raise llfuse.FUSEError(errno.EIO)
411
412         # update atime
413         self.inodes.touch(p)
414
415         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
416         return fh
417
418     @catch_exceptions
419     def readdir(self, fh, off):
420         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
421
422         if fh in self._filehandles:
423             handle = self._filehandles[fh]
424         else:
425             raise llfuse.FUSEError(errno.EBADF)
426
427         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
428
429         e = off
430         while e < len(handle.entries):
431             if handle.entries[e][1].inode in self.inodes:
432                 try:
433                     yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
434                 except UnicodeEncodeError:
435                     pass
436             e += 1
437
438     @catch_exceptions
439     def statfs(self):
440         st = llfuse.StatvfsData()
441         st.f_bsize = 64 * 1024
442         st.f_blocks = 0
443         st.f_files = 0
444
445         st.f_bfree = 0
446         st.f_bavail = 0
447
448         st.f_ffree = 0
449         st.f_favail = 0
450
451         st.f_frsize = 0
452         return st
453
454     def _check_writable(self, inode_parent):
455         if inode_parent in self.inodes:
456             p = self.inodes[inode_parent]
457         else:
458             raise llfuse.FUSEError(errno.ENOENT)
459
460         if not isinstance(p, Directory):
461             raise llfuse.FUSEError(errno.ENOTDIR)
462
463         if not p.writable():
464             raise llfuse.FUSEError(errno.EPERM)
465
466         return p
467
468     @catch_exceptions
469     def create(self, inode_parent, name, mode, flags, ctx):
470         p = self._check_writable(inode_parent)
471         p.create(name)
472
473         # The file entry should have been implicitly created by callback.
474         f = p[name]
475         fh = next(self._filehandles_counter)
476         self._filehandles[fh] = FileHandle(fh, f)
477         self.inodes.touch(p)
478
479         f.inc_ref()
480         return (fh, self.getattr(f.inode))
481
482     @catch_exceptions
483     def mkdir(self, inode_parent, name, mode, ctx):
484         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
485
486         p = self._check_writable(inode_parent)
487         p.mkdir(name)
488
489         # The dir entry should have been implicitly created by callback.
490         d = p[name]
491
492         d.inc_ref()
493         return self.getattr(d.inode)
494
495     @catch_exceptions
496     def unlink(self, inode_parent, name):
497         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
498         p = self._check_writable(inode_parent)
499         p.unlink(name)
500
501     @catch_exceptions
502     def rmdir(self, inode_parent, name):
503         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
504         p = self._check_writable(inode_parent)
505         p.rmdir(name)
506
507     @catch_exceptions
508     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
509         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
510         src = self._check_writable(inode_parent_old)
511         dest = self._check_writable(inode_parent_new)
512         dest.rename(name_old, name_new, src)
513
514     @catch_exceptions
515     def flush(self, fh):
516         if fh in self._filehandles:
517             self._filehandles[fh].flush()
518
519     def fsync(self, fh, datasync):
520         self.flush(fh)
521
522     def fsyncdir(self, fh, datasync):
523         self.flush(fh)