3198: Support mkdir() and rmdir() to create collections on projects. Support
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23 import ciso8601
24 import collections
25 import functools
26
27 from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
28 from fusefile import StringFile, FuseArvadosFile
29
30 _logger = logging.getLogger('arvados.arvados_fuse')
31
32 log_handler = logging.StreamHandler()
33 llogger = logging.getLogger('llfuse')
34 llogger.addHandler(log_handler)
35 llogger.setLevel(logging.DEBUG)
36
37 class Handle(object):
38     """Connects a numeric file handle to a File or Directory object that has
39     been opened by the client."""
40
41     def __init__(self, fh, obj):
42         self.fh = fh
43         self.obj = obj
44         self.obj.inc_use()
45
46     def release(self):
47         self.obj.dec_use()
48
49     def flush(self):
50         with llfuse.lock_released:
51             return self.obj.flush()
52
53
54 class FileHandle(Handle):
55     """Connects a numeric file handle to a File  object that has
56     been opened by the client."""
57     pass
58
59
60 class DirectoryHandle(Handle):
61     """Connects a numeric file handle to a Directory object that has
62     been opened by the client."""
63
64     def __init__(self, fh, dirobj, entries):
65         super(DirectoryHandle, self).__init__(fh, dirobj)
66         self.entries = entries
67
68
69 class InodeCache(object):
70     def __init__(self, cap, min_entries=4):
71         self._entries = collections.OrderedDict()
72         self._by_uuid = {}
73         self._counter = itertools.count(0)
74         self.cap = cap
75         self._total = 0
76         self.min_entries = min_entries
77
78     def total(self):
79         return self._total
80
81     def _remove(self, obj, clear):
82         if clear and not obj.clear():
83             _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
84             return False
85         self._total -= obj.cache_size
86         del self._entries[obj.cache_priority]
87         if obj.cache_uuid:
88             del self._by_uuid[obj.cache_uuid]
89             obj.cache_uuid = None
90         if clear:
91             _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
92         return True
93
94     def cap_cache(self):
95         #_logger.debug("InodeCache total is %i cap is %i", self._total, self.cap)
96         if self._total > self.cap:
97             for key in list(self._entries.keys()):
98                 if self._total < self.cap or len(self._entries) < self.min_entries:
99                     break
100                 self._remove(self._entries[key], True)
101
102     def manage(self, obj):
103         if obj.persisted():
104             obj.cache_priority = next(self._counter)
105             obj.cache_size = obj.objsize()
106             self._entries[obj.cache_priority] = obj
107             obj.cache_uuid = obj.uuid()
108             if obj.cache_uuid:
109                 self._by_uuid[obj.cache_uuid] = obj
110             self._total += obj.objsize()
111             _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
112             self.cap_cache()
113         else:
114             obj.cache_priority = None
115
116     def touch(self, obj):
117         if obj.persisted():
118             if obj.cache_priority in self._entries:
119                 self._remove(obj, False)
120             self.manage(obj)
121
122     def unmanage(self, obj):
123         if obj.persisted() and obj.cache_priority in self._entries:
124             self._remove(obj, True)
125
126     def find(self, uuid):
127         return self._by_uuid.get(uuid)
128
129 class Inodes(object):
130     """Manage the set of inodes.  This is the mapping from a numeric id
131     to a concrete File or Directory object"""
132
133     def __init__(self, inode_cache):
134         self._entries = {}
135         self._counter = itertools.count(llfuse.ROOT_INODE)
136         self.inode_cache = inode_cache
137
138     def __getitem__(self, item):
139         return self._entries[item]
140
141     def __setitem__(self, key, item):
142         self._entries[key] = item
143
144     def __iter__(self):
145         return self._entries.iterkeys()
146
147     def items(self):
148         return self._entries.items()
149
150     def __contains__(self, k):
151         return k in self._entries
152
153     def touch(self, entry):
154         entry._atime = time.time()
155         self.inode_cache.touch(entry)
156
157     def add_entry(self, entry):
158         entry.inode = next(self._counter)
159         if entry.inode == llfuse.ROOT_INODE:
160             entry.inc_ref()
161         self._entries[entry.inode] = entry
162         self.inode_cache.manage(entry)
163         return entry
164
165     def del_entry(self, entry):
166         if entry.ref_count == 0:
167             _logger.debug("Deleting inode %i", entry.inode)
168             self.inode_cache.unmanage(entry)
169             llfuse.invalidate_inode(entry.inode)
170             del self._entries[entry.inode]
171             entry.inode = None
172         else:
173             entry.dead = True
174             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
175
176 def catch_exceptions(orig_func):
177     @functools.wraps(orig_func)
178     def catch_exceptions_wrapper(self, *args, **kwargs):
179         try:
180             return orig_func(self, *args, **kwargs)
181         except llfuse.FUSEError:
182             raise
183         except EnvironmentError as e:
184             raise llfuse.FUSEError(e.errno)
185         except:
186             _logger.exception("Unhandled exception during FUSE operation")
187             raise llfuse.FUSEError(errno.EIO)
188
189     return catch_exceptions_wrapper
190
191
192 class Operations(llfuse.Operations):
193     """This is the main interface with llfuse.
194
195     The methods on this object are called by llfuse threads to service FUSE
196     events to query and read from the file system.
197
198     llfuse has its own global lock which is acquired before calling a request handler,
199     so request handlers do not run concurrently unless the lock is explicitly released
200     using 'with llfuse.lock_released:'
201
202     """
203
204     def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4):
205         super(Operations, self).__init__()
206
207         if not inode_cache:
208             inode_cache = InodeCache(cap=256*1024*1024)
209         self.inodes = Inodes(inode_cache)
210         self.uid = uid
211         self.gid = gid
212         self.encoding = encoding
213
214         # dict of inode to filehandle
215         self._filehandles = {}
216         self._filehandles_counter = itertools.count(0)
217
218         # Other threads that need to wait until the fuse driver
219         # is fully initialized should wait() on this event object.
220         self.initlock = threading.Event()
221
222         self.num_retries = num_retries
223
224         self.events = None
225
226     def init(self):
227         # Allow threads that are waiting for the driver to be finished
228         # initializing to continue
229         self.initlock.set()
230
231     def destroy(self):
232         if self.events:
233             self.events.close()
234
235     def access(self, inode, mode, ctx):
236         return True
237
238     def listen_for_events(self, api_client):
239         self.event = arvados.events.subscribe(api_client,
240                                  [["event_type", "in", ["create", "update", "delete"]]],
241                                  self.on_event)
242
243     def on_event(self, ev):
244         if 'event_type' in ev:
245             with llfuse.lock:
246                 item = self.inodes.inode_cache.find(ev["object_uuid"])
247                 if item:
248                     item.invalidate()
249                     item.update()
250
251                 itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
252                 if itemparent:
253                     itemparent.invalidate()
254                     itemparent.update()
255
256     @catch_exceptions
257     def getattr(self, inode):
258         if inode not in self.inodes:
259             raise llfuse.FUSEError(errno.ENOENT)
260
261         e = self.inodes[inode]
262
263         entry = llfuse.EntryAttributes()
264         entry.st_ino = inode
265         entry.generation = 0
266         entry.entry_timeout = 300
267         entry.attr_timeout = 300
268
269         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
270         if isinstance(e, Directory):
271             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
272         else:
273             entry.st_mode |= stat.S_IFREG
274             if isinstance(e, FuseArvadosFile):
275                 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
276
277         if e.writable():
278             entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
279
280         entry.st_nlink = 1
281         entry.st_uid = self.uid
282         entry.st_gid = self.gid
283         entry.st_rdev = 0
284
285         entry.st_size = e.size()
286
287         entry.st_blksize = 512
288         entry.st_blocks = (e.size()/512)+1
289         entry.st_atime = int(e.atime())
290         entry.st_mtime = int(e.mtime())
291         entry.st_ctime = int(e.mtime())
292
293         return entry
294
295     @catch_exceptions
296     def lookup(self, parent_inode, name):
297         name = unicode(name, self.encoding)
298         inode = None
299
300         if name == '.':
301             inode = parent_inode
302         else:
303             if parent_inode in self.inodes:
304                 p = self.inodes[parent_inode]
305                 if name == '..':
306                     inode = p.parent_inode
307                 elif isinstance(p, Directory) and name in p:
308                     inode = p[name].inode
309
310         if inode != None:
311             _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
312                       parent_inode, name, inode)
313             self.inodes[inode].inc_ref()
314             return self.getattr(inode)
315         else:
316             _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
317                       parent_inode, name)
318             raise llfuse.FUSEError(errno.ENOENT)
319
320     @catch_exceptions
321     def forget(self, inodes):
322         for inode, nlookup in inodes:
323             ent = self.inodes[inode]
324             _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
325             if ent.dec_ref(nlookup) == 0 and ent.dead:
326                 self.inodes.del_entry(ent)
327
328     @catch_exceptions
329     def open(self, inode, flags):
330         if inode in self.inodes:
331             p = self.inodes[inode]
332         else:
333             raise llfuse.FUSEError(errno.ENOENT)
334
335         if isinstance(p, Directory):
336             raise llfuse.FUSEError(errno.EISDIR)
337
338         if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
339             raise llfuse.FUSEError(errno.EPERM)
340
341         fh = next(self._filehandles_counter)
342         self._filehandles[fh] = FileHandle(fh, p)
343         self.inodes.touch(p)
344         return fh
345
346     @catch_exceptions
347     def read(self, fh, off, size):
348         _logger.debug("arv-mount read %i %i %i", fh, off, size)
349         if fh in self._filehandles:
350             handle = self._filehandles[fh]
351         else:
352             raise llfuse.FUSEError(errno.EBADF)
353
354         self.inodes.touch(handle.obj)
355
356         try:
357             with llfuse.lock_released:
358                 return handle.obj.readfrom(off, size, self.num_retries)
359         except arvados.errors.NotFoundError as e:
360             _logger.warning("Block not found: " + str(e))
361             raise llfuse.FUSEError(errno.EIO)
362
363     @catch_exceptions
364     def write(self, fh, off, buf):
365         _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
366         if fh in self._filehandles:
367             handle = self._filehandles[fh]
368         else:
369             raise llfuse.FUSEError(errno.EBADF)
370
371         if not handle.obj.writable():
372             raise llfuse.FUSEError(errno.EPERM)
373
374         self.inodes.touch(handle.obj)
375
376         with llfuse.lock_released:
377             return handle.obj.writeto(off, buf, self.num_retries)
378
379     @catch_exceptions
380     def release(self, fh):
381         if fh in self._filehandles:
382             try:
383                 self._filehandles[fh].flush()
384             except EnvironmentError as e:
385                 raise llfuse.FUSEError(e.errno)
386             except Exception:
387                 _logger.exception("Flush error")
388             self._filehandles[fh].release()
389             del self._filehandles[fh]
390         self.inodes.inode_cache.cap_cache()
391
392     def releasedir(self, fh):
393         self.release(fh)
394
395     @catch_exceptions
396     def opendir(self, inode):
397         _logger.debug("arv-mount opendir: inode %i", inode)
398
399         if inode in self.inodes:
400             p = self.inodes[inode]
401         else:
402             raise llfuse.FUSEError(errno.ENOENT)
403
404         if not isinstance(p, Directory):
405             raise llfuse.FUSEError(errno.ENOTDIR)
406
407         fh = next(self._filehandles_counter)
408         if p.parent_inode in self.inodes:
409             parent = self.inodes[p.parent_inode]
410         else:
411             raise llfuse.FUSEError(errno.EIO)
412
413         # update atime
414         self.inodes.touch(p)
415
416         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
417         return fh
418
419     @catch_exceptions
420     def readdir(self, fh, off):
421         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
422
423         if fh in self._filehandles:
424             handle = self._filehandles[fh]
425         else:
426             raise llfuse.FUSEError(errno.EBADF)
427
428         _logger.debug("arv-mount handle.dirobj %s", handle.obj)
429
430         e = off
431         while e < len(handle.entries):
432             if handle.entries[e][1].inode in self.inodes:
433                 try:
434                     yield (handle.entries[e][0].encode(self.encoding), self.getattr(handle.entries[e][1].inode), e+1)
435                 except UnicodeEncodeError:
436                     pass
437             e += 1
438
439     @catch_exceptions
440     def statfs(self):
441         st = llfuse.StatvfsData()
442         st.f_bsize = 64 * 1024
443         st.f_blocks = 0
444         st.f_files = 0
445
446         st.f_bfree = 0
447         st.f_bavail = 0
448
449         st.f_ffree = 0
450         st.f_favail = 0
451
452         st.f_frsize = 0
453         return st
454
455     def _check_writable(self, inode_parent):
456         if inode_parent in self.inodes:
457             p = self.inodes[inode_parent]
458         else:
459             raise llfuse.FUSEError(errno.ENOENT)
460
461         if not isinstance(p, Directory):
462             raise llfuse.FUSEError(errno.ENOTDIR)
463
464         if not p.writable():
465             raise llfuse.FUSEError(errno.EPERM)
466
467         return p
468
469     @catch_exceptions
470     def create(self, inode_parent, name, mode, flags, ctx):
471         p = self._check_writable(inode_parent)
472         p.create(name)
473
474         # The file entry should have been implicitly created by callback.
475         f = p[name]
476         fh = next(self._filehandles_counter)
477         self._filehandles[fh] = FileHandle(fh, f)
478         self.inodes.touch(p)
479
480         f.inc_ref()
481         return (fh, self.getattr(f.inode))
482
483     @catch_exceptions
484     def mkdir(self, inode_parent, name, mode, ctx):
485         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
486
487         p = self._check_writable(inode_parent)
488         p.mkdir(name)
489
490         # The dir entry should have been implicitly created by callback.
491         d = p[name]
492
493         d.inc_ref()
494         return self.getattr(d.inode)
495
496     @catch_exceptions
497     def unlink(self, inode_parent, name):
498         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
499         p = self._check_writable(inode_parent)
500         p.unlink(name)
501
502     @catch_exceptions
503     def rmdir(self, inode_parent, name):
504         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
505         p = self._check_writable(inode_parent)
506         p.rmdir(name)
507
508     @catch_exceptions
509     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
510         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
511         src = self._check_writable(inode_parent_old)
512         dest = self._check_writable(inode_parent_new)
513         dest.rename(name_old, name_new, src)
514
515     @catch_exceptions
516     def flush(self, fh):
517         if fh in self._filehandles:
518             self._filehandles[fh].flush()
519
520     def fsync(self, fh, datasync):
521         self.flush(fh)
522
523     def fsyncdir(self, fh, datasync):
524         self.flush(fh)