Merge branch '3235-top-nav-site-search' refs #3235
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7
8 import llfuse
9 import errno
10 import stat
11 import threading
12 import arvados
13 import pprint
14 import arvados.events
15 import re
16 import apiclient
17 import json
18 import logging
19
20 from time import time
21 from llfuse import FUSEError
22
23 class FreshBase(object):
24     '''Base class for maintaining fresh/stale state to determine when to update.'''
25     def __init__(self):
26         self._stale = True
27         self._poll = False
28         self._last_update = time()
29         self._poll_time = 60
30
31     # Mark the value as stale
32     def invalidate(self):
33         self._stale = True
34
35     # Test if the entries dict is stale
36     def stale(self):
37         if self._stale:
38             return True
39         if self._poll:
40             return (self._last_update + self._poll_time) < time()
41         return False
42
43     def fresh(self):
44         self._stale = False
45         self._last_update = time()
46
47
48 class File(FreshBase):
49     '''Base for file objects.'''
50
51     def __init__(self, parent_inode):
52         super(File, self).__init__()
53         self.inode = None
54         self.parent_inode = parent_inode
55
56     def size(self):
57         return 0
58
59     def readfrom(self, off, size):
60         return ''
61
62
63 class StreamReaderFile(File):
64     '''Wraps a StreamFileReader as a file.'''
65
66     def __init__(self, parent_inode, reader):
67         super(StreamReaderFile, self).__init__(parent_inode)
68         self.reader = reader
69
70     def size(self):
71         return self.reader.size()
72
73     def readfrom(self, off, size):
74         return self.reader.readfrom(off, size)
75
76     def stale(self):
77         return False
78
79
80 class ObjectFile(File):
81     '''Wraps a dict as a serialized json object.'''
82
83     def __init__(self, parent_inode, contents):
84         super(ObjectFile, self).__init__(parent_inode)
85         self.contentsdict = contents
86         self.uuid = self.contentsdict['uuid']
87         self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
88
89     def size(self):
90         return len(self.contents)
91
92     def readfrom(self, off, size):
93         return self.contents[off:(off+size)]
94
95
96 class Directory(FreshBase):
97     '''Generic directory object, backed by a dict.
98     Consists of a set of entries with the key representing the filename
99     and the value referencing a File or Directory object.
100     '''
101
102     def __init__(self, parent_inode):
103         super(Directory, self).__init__()
104
105         '''parent_inode is the integer inode number'''
106         self.inode = None
107         if not isinstance(parent_inode, int):
108             raise Exception("parent_inode should be an int")
109         self.parent_inode = parent_inode
110         self._entries = {}
111
112     #  Overriden by subclasses to implement logic to update the entries dict
113     #  when the directory is stale
114     def update(self):
115         pass
116
117     # Only used when computing the size of the disk footprint of the directory
118     # (stub)
119     def size(self):
120         return 0
121
122     def checkupdate(self):
123         if self.stale():
124             try:
125                 self.update()
126             except apiclient.errors.HttpError as e:
127                 logging.debug(e)
128
129     def __getitem__(self, item):
130         self.checkupdate()
131         return self._entries[item]
132
133     def items(self):
134         self.checkupdate()
135         return self._entries.items()
136
137     def __iter__(self):
138         self.checkupdate()
139         return self._entries.iterkeys()
140
141     def __contains__(self, k):
142         self.checkupdate()
143         return k in self._entries
144
145     def merge(self, items, fn, same, new_entry):
146         '''Helper method for updating the contents of the directory.
147
148         items: array with new directory contents
149
150         fn: function to take an entry in 'items' and return the desired file or
151         directory name
152
153         same: function to compare an existing entry with an entry in the items
154         list to determine whether to keep the existing entry.
155
156         new_entry: function to create a new directory entry from array entry.
157         '''
158
159         oldentries = self._entries
160         self._entries = {}
161         for i in items:
162             n = fn(i)
163             if n in oldentries and same(oldentries[n], i):
164                 self._entries[n] = oldentries[n]
165                 del oldentries[n]
166             else:
167                 self._entries[n] = self.inodes.add_entry(new_entry(i))
168         for n in oldentries:
169             llfuse.invalidate_entry(self.inode, str(n))
170             self.inodes.del_entry(oldentries[n])
171         self.fresh()
172
173
174 class CollectionDirectory(Directory):
175     '''Represents the root of a directory tree holding a collection.'''
176
177     def __init__(self, parent_inode, inodes, collection_locator):
178         super(CollectionDirectory, self).__init__(parent_inode)
179         self.inodes = inodes
180         self.collection_locator = collection_locator
181
182     def same(self, i):
183         return i['uuid'] == self.collection_locator
184
185     def update(self):
186         try:
187             collection = arvados.CollectionReader(self.collection_locator)
188             for s in collection.all_streams():
189                 cwd = self
190                 for part in s.name().split('/'):
191                     if part != '' and part != '.':
192                         if part not in cwd._entries:
193                             cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
194                         cwd = cwd._entries[part]
195                 for k, v in s.files().items():
196                     cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
197             self.fresh()
198             return True
199         except Exception as detail:
200             logging.debug("arv-mount %s: error: %s" % (self.collection_locator,detail))
201             return False
202
203 class MagicDirectory(Directory):
204     '''A special directory that logically contains the set of all extant keep
205     locators.  When a file is referenced by lookup(), it is tested to see if it
206     is a valid keep locator to a manifest, and if so, loads the manifest
207     contents as a subdirectory of this directory with the locator as the
208     directory name.  Since querying a list of all extant keep locators is
209     impractical, only collections that have already been accessed are visible
210     to readdir().
211     '''
212
213     def __init__(self, parent_inode, inodes):
214         super(MagicDirectory, self).__init__(parent_inode)
215         self.inodes = inodes
216
217     def __contains__(self, k):
218         if k in self._entries:
219             return True
220         try:
221             e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, k))
222             if e.update():
223                 self._entries[k] = e
224                 return True
225             else:
226                 return False
227         except Exception as e:
228             logging.debug('arv-mount exception keep %s', e)
229             return False
230
231     def __getitem__(self, item):
232         if item in self:
233             return self._entries[item]
234         else:
235             raise KeyError("No collection with id " + item)
236
237 class TagsDirectory(Directory):
238     '''A special directory that contains as subdirectories all tags visible to the user.'''
239
240     def __init__(self, parent_inode, inodes, api, poll_time=60):
241         super(TagsDirectory, self).__init__(parent_inode)
242         self.inodes = inodes
243         self.api = api
244         try:
245             arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
246         except:
247             self._poll = True
248             self._poll_time = poll_time
249
250     def invalidate(self):
251         with llfuse.lock:
252             super(TagsDirectory, self).invalidate()
253             for a in self._entries:
254                 self._entries[a].invalidate()
255
256     def update(self):
257         tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
258         if "items" in tags:
259             self.merge(tags['items'],
260                        lambda i: i['name'],
261                        lambda a, i: a.tag == i,
262                        lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
263
264 class TagDirectory(Directory):
265     '''A special directory that contains as subdirectories all collections visible
266     to the user that are tagged with a particular tag.
267     '''
268
269     def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
270         super(TagDirectory, self).__init__(parent_inode)
271         self.inodes = inodes
272         self.api = api
273         self.tag = tag
274         self._poll = poll
275         self._poll_time = poll_time
276
277     def update(self):
278         taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
279                                                ['name', '=', self.tag],
280                                                ['head_uuid', 'is_a', 'arvados#collection']],
281                                       select=['head_uuid']).execute()
282         self.merge(taggedcollections['items'],
283                    lambda i: i['head_uuid'],
284                    lambda a, i: a.collection_locator == i['head_uuid'],
285                    lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
286
287
288 class GroupsDirectory(Directory):
289     '''A special directory that contains as subdirectories all groups visible to the user.'''
290
291     def __init__(self, parent_inode, inodes, api, poll_time=60):
292         super(GroupsDirectory, self).__init__(parent_inode)
293         self.inodes = inodes
294         self.api = api
295         try:
296             arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
297         except:
298             self._poll = True
299             self._poll_time = poll_time
300
301     def invalidate(self):
302         with llfuse.lock:
303             super(GroupsDirectory, self).invalidate()
304             for a in self._entries:
305                 self._entries[a].invalidate()
306
307     def update(self):
308         groups = self.api.groups().list(
309             filters=[['group_class','=','project']]).execute()
310         self.merge(groups['items'],
311                    lambda i: i['uuid'],
312                    lambda a, i: a.uuid == i['uuid'],
313                    lambda i: GroupDirectory(self.inode, self.inodes, self.api, i, poll=self._poll, poll_time=self._poll_time))
314
315
316 class GroupDirectory(Directory):
317     '''A special directory that contains the contents of a group.'''
318
319     def __init__(self, parent_inode, inodes, api, uuid, poll=False, poll_time=60):
320         super(GroupDirectory, self).__init__(parent_inode)
321         self.inodes = inodes
322         self.api = api
323         self.uuid = uuid['uuid']
324         self._poll = poll
325         self._poll_time = poll_time
326
327     def invalidate(self):
328         with llfuse.lock:
329             super(GroupDirectory, self).invalidate()
330             for a in self._entries:
331                 self._entries[a].invalidate()
332
333     def createDirectory(self, i):
334         if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
335             return CollectionDirectory(self.inode, self.inodes, i['uuid'])
336         elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
337             return GroupDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
338         elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
339             return ObjectFile(self.parent_inode, i)
340         return None
341
342     def update(self):
343         contents = self.api.groups().contents(uuid=self.uuid, include_linked=True).execute()
344         links = {}
345         for a in contents['links']:
346             links[a['head_uuid']] = a['name']
347
348         def choose_name(i):
349             if i['uuid'] in links:
350                 return links[i['uuid']]
351             else:
352                 return i['uuid']
353
354         def same(a, i):
355             if isinstance(a, CollectionDirectory):
356                 return a.collection_locator == i['uuid']
357             elif isinstance(a, GroupDirectory):
358                 return a.uuid == i['uuid']
359             elif isinstance(a, ObjectFile):
360                 return a.uuid == i['uuid'] and not a.stale()
361             return False
362
363         self.merge(contents['items'],
364                    choose_name,
365                    same,
366                    self.createDirectory)
367
368
369 class FileHandle(object):
370     '''Connects a numeric file handle to a File or Directory object that has
371     been opened by the client.'''
372
373     def __init__(self, fh, entry):
374         self.fh = fh
375         self.entry = entry
376
377
378 class Inodes(object):
379     '''Manage the set of inodes.  This is the mapping from a numeric id
380     to a concrete File or Directory object'''
381
382     def __init__(self):
383         self._entries = {}
384         self._counter = llfuse.ROOT_INODE
385
386     def __getitem__(self, item):
387         return self._entries[item]
388
389     def __setitem__(self, key, item):
390         self._entries[key] = item
391
392     def __iter__(self):
393         return self._entries.iterkeys()
394
395     def items(self):
396         return self._entries.items()
397
398     def __contains__(self, k):
399         return k in self._entries
400
401     def add_entry(self, entry):
402         entry.inode = self._counter
403         self._entries[entry.inode] = entry
404         self._counter += 1
405         return entry
406
407     def del_entry(self, entry):
408         llfuse.invalidate_inode(entry.inode)
409         del self._entries[entry.inode]
410
411 class Operations(llfuse.Operations):
412     '''This is the main interface with llfuse.  The methods on this object are
413     called by llfuse threads to service FUSE events to query and read from
414     the file system.
415
416     llfuse has its own global lock which is acquired before calling a request handler,
417     so request handlers do not run concurrently unless the lock is explicitly released
418     with llfuse.lock_released.'''
419
420     def __init__(self, uid, gid):
421         super(Operations, self).__init__()
422
423         self.inodes = Inodes()
424         self.uid = uid
425         self.gid = gid
426
427         # dict of inode to filehandle
428         self._filehandles = {}
429         self._filehandles_counter = 1
430
431         # Other threads that need to wait until the fuse driver
432         # is fully initialized should wait() on this event object.
433         self.initlock = threading.Event()
434
435     def init(self):
436         # Allow threads that are waiting for the driver to be finished
437         # initializing to continue
438         self.initlock.set()
439
440     def access(self, inode, mode, ctx):
441         return True
442
443     def getattr(self, inode):
444         if inode not in self.inodes:
445             raise llfuse.FUSEError(errno.ENOENT)
446
447         e = self.inodes[inode]
448
449         entry = llfuse.EntryAttributes()
450         entry.st_ino = inode
451         entry.generation = 0
452         entry.entry_timeout = 300
453         entry.attr_timeout = 300
454
455         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
456         if isinstance(e, Directory):
457             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
458         else:
459             entry.st_mode |= stat.S_IFREG
460
461         entry.st_nlink = 1
462         entry.st_uid = self.uid
463         entry.st_gid = self.gid
464         entry.st_rdev = 0
465
466         entry.st_size = e.size()
467
468         entry.st_blksize = 1024
469         entry.st_blocks = e.size()/1024
470         if e.size()/1024 != 0:
471             entry.st_blocks += 1
472         entry.st_atime = 0
473         entry.st_mtime = 0
474         entry.st_ctime = 0
475
476         return entry
477
478     def lookup(self, parent_inode, name):
479         logging.debug("arv-mount lookup: parent_inode %i name %s", parent_inode, name)
480         inode = None
481
482         if name == '.':
483             inode = parent_inode
484         else:
485             if parent_inode in self.inodes:
486                 p = self.inodes[parent_inode]
487                 if name == '..':
488                     inode = p.parent_inode
489                 elif name in p:
490                     inode = p[name].inode
491
492         if inode != None:
493             return self.getattr(inode)
494         else:
495             raise llfuse.FUSEError(errno.ENOENT)
496
497     def open(self, inode, flags):
498         if inode in self.inodes:
499             p = self.inodes[inode]
500         else:
501             raise llfuse.FUSEError(errno.ENOENT)
502
503         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
504             raise llfuse.FUSEError(errno.EROFS)
505
506         if isinstance(p, Directory):
507             raise llfuse.FUSEError(errno.EISDIR)
508
509         fh = self._filehandles_counter
510         self._filehandles_counter += 1
511         self._filehandles[fh] = FileHandle(fh, p)
512         return fh
513
514     def read(self, fh, off, size):
515         logging.debug("arv-mount read %i %i %i", fh, off, size)
516         if fh in self._filehandles:
517             handle = self._filehandles[fh]
518         else:
519             raise llfuse.FUSEError(errno.EBADF)
520
521         try:
522             with llfuse.lock_released:
523                 return handle.entry.readfrom(off, size)
524         except:
525             raise llfuse.FUSEError(errno.EIO)
526
527     def release(self, fh):
528         if fh in self._filehandles:
529             del self._filehandles[fh]
530
531     def opendir(self, inode):
532         logging.debug("arv-mount opendir: inode %i", inode)
533
534         if inode in self.inodes:
535             p = self.inodes[inode]
536         else:
537             raise llfuse.FUSEError(errno.ENOENT)
538
539         if not isinstance(p, Directory):
540             raise llfuse.FUSEError(errno.ENOTDIR)
541
542         fh = self._filehandles_counter
543         self._filehandles_counter += 1
544         if p.parent_inode in self.inodes:
545             parent = self.inodes[p.parent_inode]
546         else:
547             raise llfuse.FUSEError(errno.EIO)
548
549         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
550         return fh
551
552     def readdir(self, fh, off):
553         logging.debug("arv-mount readdir: fh %i off %i", fh, off)
554
555         if fh in self._filehandles:
556             handle = self._filehandles[fh]
557         else:
558             raise llfuse.FUSEError(errno.EBADF)
559
560         logging.debug("arv-mount handle.entry %s", handle.entry)
561
562         e = off
563         while e < len(handle.entry):
564             if handle.entry[e][1].inode in self.inodes:
565                 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
566             e += 1
567
568     def releasedir(self, fh):
569         del self._filehandles[fh]
570
571     def statfs(self):
572         st = llfuse.StatvfsData()
573         st.f_bsize = 1024 * 1024
574         st.f_blocks = 0
575         st.f_files = 0
576
577         st.f_bfree = 0
578         st.f_bavail = 0
579
580         st.f_ffree = 0
581         st.f_favail = 0
582
583         st.f_frsize = 0
584         return st
585
586     # The llfuse documentation recommends only overloading functions that
587     # are actually implemented, as the default implementation will raise ENOSYS.
588     # However, there is a bug in the llfuse default implementation of create()
589     # "create() takes exactly 5 positional arguments (6 given)" which will crash
590     # arv-mount.
591     # The workaround is to implement it with the proper number of parameters,
592     # and then everything works out.
593     def create(self, p1, p2, p3, p4, p5):
594         raise llfuse.FUSEError(errno.EROFS)