Merge remote-tracking branch 'origin/master' into pete-fixes
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 from __future__ import print_function
6 import os
7 import sys
8
9 import llfuse
10 import errno
11 import stat
12 import threading
13 import arvados
14 import pprint
15 import arvados.events
16 import re
17 import apiclient
18 import json
19 import logging
20
21 from time import time
22 from llfuse import FUSEError
23
24 class FreshBase(object):
25     '''Base class for maintaining fresh/stale state to determine when to update.'''
26     def __init__(self):
27         self._stale = True
28         self._poll = False
29         self._last_update = time()
30         self._poll_time = 60
31
32     # Mark the value as stale
33     def invalidate(self):
34         self._stale = True
35
36     # Test if the entries dict is stale
37     def stale(self):
38         if self._stale:
39             return True
40         if self._poll:
41             return (self._last_update + self._poll_time) < time()
42         return False
43
44     def fresh(self):
45         self._stale = False
46         self._last_update = time()
47
48
49 class File(FreshBase):
50     '''Base for file objects.'''
51
52     def __init__(self, parent_inode):
53         super(File, self).__init__()
54         self.inode = None
55         self.parent_inode = parent_inode
56
57     def size(self):
58         return 0
59
60     def readfrom(self, off, size):
61         return ''
62
63
64 class StreamReaderFile(File):
65     '''Wraps a StreamFileReader as a file.'''
66
67     def __init__(self, parent_inode, reader):
68         super(StreamReaderFile, self).__init__(parent_inode)
69         self.reader = reader
70
71     def size(self):
72         return self.reader.size()
73
74     def readfrom(self, off, size):
75         return self.reader.readfrom(off, size)
76
77     def stale(self):
78         return False
79
80
81 class ObjectFile(File):
82     '''Wraps a dict as a serialized json object.'''
83
84     def __init__(self, parent_inode, contents):
85         super(ObjectFile, self).__init__(parent_inode)
86         self.contentsdict = contents
87         self.uuid = self.contentsdict['uuid']
88         self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
89
90     def size(self):
91         return len(self.contents)
92
93     def readfrom(self, off, size):
94         return self.contents[off:(off+size)]
95
96
97 class Directory(FreshBase):
98     '''Generic directory object, backed by a dict.
99     Consists of a set of entries with the key representing the filename
100     and the value referencing a File or Directory object.
101     '''
102
103     def __init__(self, parent_inode):
104         super(Directory, self).__init__()
105
106         '''parent_inode is the integer inode number'''
107         self.inode = None
108         if not isinstance(parent_inode, int):
109             raise Exception("parent_inode should be an int")
110         self.parent_inode = parent_inode
111         self._entries = {}
112
113     #  Overriden by subclasses to implement logic to update the entries dict
114     #  when the directory is stale
115     def update(self):
116         pass
117
118     # Only used when computing the size of the disk footprint of the directory
119     # (stub)
120     def size(self):
121         return 0
122
123     def checkupdate(self):
124         if self.stale():
125             try:
126                 self.update()
127             except apiclient.errors.HttpError as e:
128                 logging.debug(e)
129
130     def __getitem__(self, item):
131         self.checkupdate()
132         return self._entries[item]
133
134     def items(self):
135         self.checkupdate()
136         return self._entries.items()
137
138     def __iter__(self):
139         self.checkupdate()
140         return self._entries.iterkeys()
141
142     def __contains__(self, k):
143         self.checkupdate()
144         return k in self._entries
145
146     def merge(self, items, fn, same, new_entry):
147         '''Helper method for updating the contents of the directory.
148
149         items: array with new directory contents
150
151         fn: function to take an entry in 'items' and return the desired file or
152         directory name
153
154         same: function to compare an existing entry with an entry in the items
155         list to determine whether to keep the existing entry.
156
157         new_entry: function to create a new directory entry from array entry.
158         '''
159
160         oldentries = self._entries
161         self._entries = {}
162         for i in items:
163             n = fn(i)
164             if n in oldentries and same(oldentries[n], i):
165                 self._entries[n] = oldentries[n]
166                 del oldentries[n]
167             else:
168                 self._entries[n] = self.inodes.add_entry(new_entry(i))
169         for n in oldentries:
170             llfuse.invalidate_entry(self.inode, str(n))
171             self.inodes.del_entry(oldentries[n])
172         self.fresh()
173
174
175 class CollectionDirectory(Directory):
176     '''Represents the root of a directory tree holding a collection.'''
177
178     def __init__(self, parent_inode, inodes, collection_locator):
179         super(CollectionDirectory, self).__init__(parent_inode)
180         self.inodes = inodes
181         self.collection_locator = collection_locator
182
183     def same(self, i):
184         return i['uuid'] == self.collection_locator
185
186     def update(self):
187         try:
188             collection = arvados.CollectionReader(self.collection_locator)
189             for s in collection.all_streams():
190                 cwd = self
191                 for part in s.name().split('/'):
192                     if part != '' and part != '.':
193                         if part not in cwd._entries:
194                             cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
195                         cwd = cwd._entries[part]
196                 for k, v in s.files().items():
197                     cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
198             self.fresh()
199             return True
200         except Exception as detail:
201             logging.debug("arv-mount %s: error: %s" % (self.collection_locator,detail))
202             return False
203
204 class MagicDirectory(Directory):
205     '''A special directory that logically contains the set of all extant keep
206     locators.  When a file is referenced by lookup(), it is tested to see if it
207     is a valid keep locator to a manifest, and if so, loads the manifest
208     contents as a subdirectory of this directory with the locator as the
209     directory name.  Since querying a list of all extant keep locators is
210     impractical, only collections that have already been accessed are visible
211     to readdir().
212     '''
213
214     def __init__(self, parent_inode, inodes):
215         super(MagicDirectory, self).__init__(parent_inode)
216         self.inodes = inodes
217
218     def __contains__(self, k):
219         if k in self._entries:
220             return True
221         try:
222             e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, k))
223             if e.update():
224                 self._entries[k] = e
225                 return True
226             else:
227                 return False
228         except Exception as e:
229             logging.debug('arv-mount exception keep %s', e)
230             return False
231
232     def __getitem__(self, item):
233         if item in self:
234             return self._entries[item]
235         else:
236             raise KeyError("No collection with id " + item)
237
238 class TagsDirectory(Directory):
239     '''A special directory that contains as subdirectories all tags visible to the user.'''
240
241     def __init__(self, parent_inode, inodes, api, poll_time=60):
242         super(TagsDirectory, self).__init__(parent_inode)
243         self.inodes = inodes
244         self.api = api
245         try:
246             arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
247         except:
248             self._poll = True
249             self._poll_time = poll_time
250
251     def invalidate(self):
252         with llfuse.lock:
253             super(TagsDirectory, self).invalidate()
254             for a in self._entries:
255                 self._entries[a].invalidate()
256
257     def update(self):
258         tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
259         if "items" in tags:
260             self.merge(tags['items'],
261                        lambda i: i['name'],
262                        lambda a, i: a.tag == i,
263                        lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
264
265 class TagDirectory(Directory):
266     '''A special directory that contains as subdirectories all collections visible
267     to the user that are tagged with a particular tag.
268     '''
269
270     def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
271         super(TagDirectory, self).__init__(parent_inode)
272         self.inodes = inodes
273         self.api = api
274         self.tag = tag
275         self._poll = poll
276         self._poll_time = poll_time
277
278     def update(self):
279         taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
280                                                ['name', '=', self.tag],
281                                                ['head_uuid', 'is_a', 'arvados#collection']],
282                                       select=['head_uuid']).execute()
283         self.merge(taggedcollections['items'],
284                    lambda i: i['head_uuid'],
285                    lambda a, i: a.collection_locator == i['head_uuid'],
286                    lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
287
288
289 class GroupsDirectory(Directory):
290     '''A special directory that contains as subdirectories all groups visible to the user.'''
291
292     def __init__(self, parent_inode, inodes, api, poll_time=60):
293         super(GroupsDirectory, self).__init__(parent_inode)
294         self.inodes = inodes
295         self.api = api
296         try:
297             arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
298         except:
299             self._poll = True
300             self._poll_time = poll_time
301
302     def invalidate(self):
303         with llfuse.lock:
304             super(GroupsDirectory, self).invalidate()
305             for a in self._entries:
306                 self._entries[a].invalidate()
307
308     def update(self):
309         groups = self.api.groups().list().execute()
310         self.merge(groups['items'],
311                    lambda i: i['uuid'],
312                    lambda a, i: a.uuid == i['uuid'],
313                    lambda i: GroupDirectory(self.inode, self.inodes, self.api, i, poll=self._poll, poll_time=self._poll_time))
314
315
316 class GroupDirectory(Directory):
317     '''A special directory that contains the contents of a group.'''
318
319     def __init__(self, parent_inode, inodes, api, uuid, poll=False, poll_time=60):
320         super(GroupDirectory, self).__init__(parent_inode)
321         self.inodes = inodes
322         self.api = api
323         self.uuid = uuid['uuid']
324         self._poll = poll
325         self._poll_time = poll_time
326
327     def invalidate(self):
328         with llfuse.lock:
329             super(GroupDirectory, self).invalidate()
330             for a in self._entries:
331                 self._entries[a].invalidate()
332
333     def createDirectory(self, i):
334         if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
335             return CollectionDirectory(self.inode, self.inodes, i['uuid'])
336         elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
337             return GroupDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
338         elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
339             return ObjectFile(self.parent_inode, i)
340         return None
341
342     def update(self):
343         contents = self.api.groups().contents(uuid=self.uuid, include_linked=True).execute()
344         links = {}
345         for a in contents['links']:
346             links[a['head_uuid']] = a['name']
347
348         def choose_name(i):
349             if i['uuid'] in links:
350                 return links[i['uuid']]
351             else:
352                 return i['uuid']
353
354         def same(a, i):
355             if isinstance(a, CollectionDirectory):
356                 return a.collection_locator == i['uuid']
357             elif isinstance(a, GroupDirectory):
358                 return a.uuid == i['uuid']
359             elif isinstance(a, ObjectFile):
360                 return a.uuid == i['uuid'] and not a.stale()
361             return False
362
363         self.merge(contents['items'],
364                    choose_name,
365                    same,
366                    self.createDirectory)
367
368
369 class FileHandle(object):
370     '''Connects a numeric file handle to a File or Directory object that has
371     been opened by the client.'''
372
373     def __init__(self, fh, entry):
374         self.fh = fh
375         self.entry = entry
376
377
378 class Inodes(object):
379     '''Manage the set of inodes.  This is the mapping from a numeric id
380     to a concrete File or Directory object'''
381
382     def __init__(self):
383         self._entries = {}
384         self._counter = llfuse.ROOT_INODE
385
386     def __getitem__(self, item):
387         return self._entries[item]
388
389     def __setitem__(self, key, item):
390         self._entries[key] = item
391
392     def __iter__(self):
393         return self._entries.iterkeys()
394
395     def items(self):
396         return self._entries.items()
397
398     def __contains__(self, k):
399         return k in self._entries
400
401     def add_entry(self, entry):
402         entry.inode = self._counter
403         self._entries[entry.inode] = entry
404         self._counter += 1
405         return entry
406
407     def del_entry(self, entry):
408         llfuse.invalidate_inode(entry.inode)
409         del self._entries[entry.inode]
410
411 class Operations(llfuse.Operations):
412     '''This is the main interface with llfuse.  The methods on this object are
413     called by llfuse threads to service FUSE events to query and read from
414     the file system.
415
416     llfuse has its own global lock which is acquired before calling a request handler,
417     so request handlers do not run concurrently unless the lock is explicitly released
418     with llfuse.lock_released.'''
419
420     def __init__(self, uid, gid, debug=False):
421         super(Operations, self).__init__()
422
423         if debug:
424             logging.basicConfig(level=logging.DEBUG)
425             logging.info("arv-mount debug enabled")
426
427         self.inodes = Inodes()
428         self.uid = uid
429         self.gid = gid
430
431         # dict of inode to filehandle
432         self._filehandles = {}
433         self._filehandles_counter = 1
434
435         # Other threads that need to wait until the fuse driver
436         # is fully initialized should wait() on this event object.
437         self.initlock = threading.Event()
438
439     def init(self):
440         # Allow threads that are waiting for the driver to be finished
441         # initializing to continue
442         self.initlock.set()
443
444     def access(self, inode, mode, ctx):
445         return True
446
447     def getattr(self, inode):
448         if inode not in self.inodes:
449             raise llfuse.FUSEError(errno.ENOENT)
450
451         e = self.inodes[inode]
452
453         entry = llfuse.EntryAttributes()
454         entry.st_ino = inode
455         entry.generation = 0
456         entry.entry_timeout = 300
457         entry.attr_timeout = 300
458
459         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
460         if isinstance(e, Directory):
461             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
462         else:
463             entry.st_mode |= stat.S_IFREG
464
465         entry.st_nlink = 1
466         entry.st_uid = self.uid
467         entry.st_gid = self.gid
468         entry.st_rdev = 0
469
470         entry.st_size = e.size()
471
472         entry.st_blksize = 1024
473         entry.st_blocks = e.size()/1024
474         if e.size()/1024 != 0:
475             entry.st_blocks += 1
476         entry.st_atime = 0
477         entry.st_mtime = 0
478         entry.st_ctime = 0
479
480         return entry
481
482     def lookup(self, parent_inode, name):
483         logging.debug("arv-mount lookup: parent_inode %i name %s", parent_inode, name)
484         inode = None
485
486         if name == '.':
487             inode = parent_inode
488         else:
489             if parent_inode in self.inodes:
490                 p = self.inodes[parent_inode]
491                 if name == '..':
492                     inode = p.parent_inode
493                 elif name in p:
494                     inode = p[name].inode
495
496         if inode != None:
497             return self.getattr(inode)
498         else:
499             raise llfuse.FUSEError(errno.ENOENT)
500
501     def open(self, inode, flags):
502         if inode in self.inodes:
503             p = self.inodes[inode]
504         else:
505             raise llfuse.FUSEError(errno.ENOENT)
506
507         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
508             raise llfuse.FUSEError(errno.EROFS)
509
510         if isinstance(p, Directory):
511             raise llfuse.FUSEError(errno.EISDIR)
512
513         fh = self._filehandles_counter
514         self._filehandles_counter += 1
515         self._filehandles[fh] = FileHandle(fh, p)
516         return fh
517
518     def read(self, fh, off, size):
519         logging.debug("arv-mount read %i %i %i", fh, off, size)
520         if fh in self._filehandles:
521             handle = self._filehandles[fh]
522         else:
523             raise llfuse.FUSEError(errno.EBADF)
524
525         try:
526             with llfuse.lock_released:
527                 return handle.entry.readfrom(off, size)
528         except:
529             raise llfuse.FUSEError(errno.EIO)
530
531     def release(self, fh):
532         if fh in self._filehandles:
533             del self._filehandles[fh]
534
535     def opendir(self, inode):
536         logging.debug("arv-mount opendir: inode %i", inode)
537
538         if inode in self.inodes:
539             p = self.inodes[inode]
540         else:
541             raise llfuse.FUSEError(errno.ENOENT)
542
543         if not isinstance(p, Directory):
544             raise llfuse.FUSEError(errno.ENOTDIR)
545
546         fh = self._filehandles_counter
547         self._filehandles_counter += 1
548         if p.parent_inode in self.inodes:
549             parent = self.inodes[p.parent_inode]
550         else:
551             raise llfuse.FUSEError(errno.EIO)
552
553         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
554         return fh
555
556     def readdir(self, fh, off):
557         logging.debug("arv-mount readdir: fh %i off %i", fh, off)
558
559         if fh in self._filehandles:
560             handle = self._filehandles[fh]
561         else:
562             raise llfuse.FUSEError(errno.EBADF)
563
564         logging.debug("arv-mount handle.entry %s", handle.entry)
565
566         e = off
567         while e < len(handle.entry):
568             if handle.entry[e][1].inode in self.inodes:
569                 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
570             e += 1
571
572     def releasedir(self, fh):
573         del self._filehandles[fh]
574
575     def statfs(self):
576         st = llfuse.StatvfsData()
577         st.f_bsize = 1024 * 1024
578         st.f_blocks = 0
579         st.f_files = 0
580
581         st.f_bfree = 0
582         st.f_bavail = 0
583
584         st.f_ffree = 0
585         st.f_favail = 0
586
587         st.f_frsize = 0
588         return st
589
590     # The llfuse documentation recommends only overloading functions that
591     # are actually implemented, as the default implementation will raise ENOSYS.
592     # However, there is a bug in the llfuse default implementation of create()
593     # "create() takes exactly 5 positional arguments (6 given)" which will crash
594     # arv-mount.
595     # The workaround is to implement it with the proper number of parameters,
596     # and then everything works out.
597     def create(self, p1, p2, p3, p4, p5):
598         raise llfuse.FUSEError(errno.EROFS)