Merge branch 'master' into 2767-doc-updates
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7
8 import llfuse
9 import errno
10 import stat
11 import threading
12 import arvados
13 import pprint
14 import arvados.events
15 import re
16 import apiclient
17 import json
18
19 from time import time
20 from llfuse import FUSEError
21
22 class FreshBase(object):
23     '''Base class for maintaining fresh/stale state to determine when to update.'''
24     def __init__(self):
25         self._stale = True
26         self._poll = False
27         self._last_update = time()
28         self._poll_time = 60
29
30     # Mark the value as stale
31     def invalidate(self):
32         self._stale = True
33
34     # Test if the entries dict is stale
35     def stale(self):
36         if self._stale:
37             return True
38         if self._poll:
39             return (self._last_update + self._poll_time) < time()
40         return False
41
42     def fresh(self):
43         self._stale = False
44         self._last_update = time()
45
46
47 class File(FreshBase):
48     '''Base for file objects.'''
49
50     def __init__(self, parent_inode):
51         super(File, self).__init__()
52         self.inode = None
53         self.parent_inode = parent_inode
54
55     def size(self):
56         return 0
57
58     def readfrom(self, off, size):
59         return ''
60
61
62 class StreamReaderFile(File):
63     '''Wraps a StreamFileReader as a file.'''
64
65     def __init__(self, parent_inode, reader):
66         super(StreamReaderFile, self).__init__(parent_inode)
67         self.reader = reader
68
69     def size(self):
70         return self.reader.size()
71
72     def readfrom(self, off, size):
73         return self.reader.readfrom(off, size)
74
75     def stale(self):
76         return False
77
78
79 class ObjectFile(File):
80     '''Wraps a dict as a serialized json object.'''
81
82     def __init__(self, parent_inode, contents):
83         super(ObjectFile, self).__init__(parent_inode)
84         self.contentsdict = contents
85         self.uuid = self.contentsdict['uuid']
86         self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
87
88     def size(self):
89         return len(self.contents)
90
91     def readfrom(self, off, size):
92         return self.contents[off:(off+size)]
93
94
95 class Directory(FreshBase):
96     '''Generic directory object, backed by a dict.
97     Consists of a set of entries with the key representing the filename
98     and the value referencing a File or Directory object.
99     '''
100
101     def __init__(self, parent_inode):
102         super(Directory, self).__init__()
103
104         '''parent_inode is the integer inode number'''
105         self.inode = None
106         if not isinstance(parent_inode, int):
107             raise Exception("parent_inode should be an int")
108         self.parent_inode = parent_inode
109         self._entries = {}
110
111     #  Overriden by subclasses to implement logic to update the entries dict
112     #  when the directory is stale
113     def update(self):
114         pass
115
116     # Only used when computing the size of the disk footprint of the directory
117     # (stub)
118     def size(self):
119         return 0
120
121     def checkupdate(self):
122         if self.stale():
123             try:
124                 self.update()
125             except apiclient.errors.HttpError as e:
126                 print e
127
128     def __getitem__(self, item):
129         self.checkupdate()
130         return self._entries[item]
131
132     def items(self):
133         self.checkupdate()
134         return self._entries.items()
135
136     def __iter__(self):
137         self.checkupdate()
138         return self._entries.iterkeys()
139
140     def __contains__(self, k):
141         self.checkupdate()
142         return k in self._entries
143
144     def merge(self, items, fn, same, new_entry):
145         '''Helper method for updating the contents of the directory.
146
147         items: array with new directory contents
148
149         fn: function to take an entry in 'items' and return the desired file or
150         directory name
151
152         same: function to compare an existing entry with an entry in the items
153         list to determine whether to keep the existing entry.
154
155         new_entry: function to create a new directory entry from array entry.
156         '''
157
158         oldentries = self._entries
159         self._entries = {}
160         for i in items:
161             n = fn(i)
162             if n in oldentries and same(oldentries[n], i):
163                 self._entries[n] = oldentries[n]
164                 del oldentries[n]
165             else:
166                 self._entries[n] = self.inodes.add_entry(new_entry(i))
167         for n in oldentries:
168             llfuse.invalidate_entry(self.inode, str(n))
169             self.inodes.del_entry(oldentries[n])
170         self.fresh()
171
172
173 class CollectionDirectory(Directory):
174     '''Represents the root of a directory tree holding a collection.'''
175
176     def __init__(self, parent_inode, inodes, collection_locator):
177         super(CollectionDirectory, self).__init__(parent_inode)
178         self.inodes = inodes
179         self.collection_locator = collection_locator
180
181     def same(self, i):
182         return i['uuid'] == self.collection_locator
183
184     def update(self):
185         collection = arvados.CollectionReader(arvados.Keep.get(self.collection_locator))
186         for s in collection.all_streams():
187             cwd = self
188             for part in s.name().split('/'):
189                 if part != '' and part != '.':
190                     if part not in cwd._entries:
191                         cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
192                     cwd = cwd._entries[part]
193             for k, v in s.files().items():
194                 cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
195         self.fresh()
196
197
198 class MagicDirectory(Directory):
199     '''A special directory that logically contains the set of all extant keep
200     locators.  When a file is referenced by lookup(), it is tested to see if it
201     is a valid keep locator to a manifest, and if so, loads the manifest
202     contents as a subdirectory of this directory with the locator as the
203     directory name.  Since querying a list of all extant keep locators is
204     impractical, only collections that have already been accessed are visible
205     to readdir().
206     '''
207
208     def __init__(self, parent_inode, inodes):
209         super(MagicDirectory, self).__init__(parent_inode)
210         self.inodes = inodes
211
212     def __contains__(self, k):
213         if k in self._entries:
214             return True
215         try:
216             if arvados.Keep.get(k):
217                 return True
218             else:
219                 return False
220         except Exception as e:
221             #print 'exception keep', e
222             return False
223
224     def __getitem__(self, item):
225         if item not in self._entries:
226             self._entries[item] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, item))
227         return self._entries[item]
228
229
230 class TagsDirectory(Directory):
231     '''A special directory that contains as subdirectories all tags visible to the user.'''
232
233     def __init__(self, parent_inode, inodes, api, poll_time=60):
234         super(TagsDirectory, self).__init__(parent_inode)
235         self.inodes = inodes
236         self.api = api
237         try:
238             arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
239         except:
240             self._poll = True
241             self._poll_time = poll_time
242
243     def invalidate(self):
244         with llfuse.lock:
245             super(TagsDirectory, self).invalidate()
246             for a in self._entries:
247                 self._entries[a].invalidate()
248
249     def update(self):
250         tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
251         self.merge(tags['items'],
252                    lambda i: i['name'],
253                    lambda a, i: a.tag == i,
254                    lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
255
256 class TagDirectory(Directory):
257     '''A special directory that contains as subdirectories all collections visible
258     to the user that are tagged with a particular tag.
259     '''
260
261     def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
262         super(TagDirectory, self).__init__(parent_inode)
263         self.inodes = inodes
264         self.api = api
265         self.tag = tag
266         self._poll = poll
267         self._poll_time = poll_time
268
269     def update(self):
270         taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
271                                                ['name', '=', self.tag],
272                                                ['head_uuid', 'is_a', 'arvados#collection']],
273                                       select=['head_uuid']).execute()
274         self.merge(taggedcollections['items'],
275                    lambda i: i['head_uuid'],
276                    lambda a, i: a.collection_locator == i['head_uuid'],
277                    lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
278
279
280 class GroupsDirectory(Directory):
281     '''A special directory that contains as subdirectories all groups visible to the user.'''
282
283     def __init__(self, parent_inode, inodes, api, poll_time=60):
284         super(GroupsDirectory, self).__init__(parent_inode)
285         self.inodes = inodes
286         self.api = api
287         try:
288             arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
289         except:
290             self._poll = True
291             self._poll_time = poll_time
292
293     def invalidate(self):
294         with llfuse.lock:
295             super(GroupsDirectory, self).invalidate()
296             for a in self._entries:
297                 self._entries[a].invalidate()
298
299     def update(self):
300         groups = self.api.groups().list().execute()
301         self.merge(groups['items'],
302                    lambda i: i['uuid'],
303                    lambda a, i: a.uuid == i['uuid'],
304                    lambda i: GroupDirectory(self.inode, self.inodes, self.api, i, poll=self._poll, poll_time=self._poll_time))
305
306
307 class GroupDirectory(Directory):
308     '''A special directory that contains the contents of a group.'''
309
310     def __init__(self, parent_inode, inodes, api, uuid, poll=False, poll_time=60):
311         super(GroupDirectory, self).__init__(parent_inode)
312         self.inodes = inodes
313         self.api = api
314         self.uuid = uuid['uuid']
315         self._poll = poll
316         self._poll_time = poll_time
317
318     def invalidate(self):
319         with llfuse.lock:
320             super(GroupDirectory, self).invalidate()
321             for a in self._entries:
322                 self._entries[a].invalidate()
323
324     def createDirectory(self, i):
325         if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
326             return CollectionDirectory(self.inode, self.inodes, i['uuid'])
327         elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
328             return GroupDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
329         elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
330             return ObjectFile(self.parent_inode, i)
331         return None
332
333     def update(self):
334         contents = self.api.groups().contents(uuid=self.uuid, include_linked=True).execute()
335         links = {}
336         for a in contents['links']:
337             links[a['head_uuid']] = a['name']
338
339         def choose_name(i):
340             if i['uuid'] in links:
341                 return links[i['uuid']]
342             else:
343                 return i['uuid']
344
345         def same(a, i):
346             if isinstance(a, CollectionDirectory):
347                 return a.collection_locator == i['uuid']
348             elif isinstance(a, GroupDirectory):
349                 return a.uuid == i['uuid']
350             elif isinstance(a, ObjectFile):
351                 return a.uuid == i['uuid'] and not a.stale()
352             return False
353
354         self.merge(contents['items'],
355                    choose_name,
356                    same,
357                    self.createDirectory)
358
359
360 class FileHandle(object):
361     '''Connects a numeric file handle to a File or Directory object that has
362     been opened by the client.'''
363
364     def __init__(self, fh, entry):
365         self.fh = fh
366         self.entry = entry
367
368
369 class Inodes(object):
370     '''Manage the set of inodes.  This is the mapping from a numeric id
371     to a concrete File or Directory object'''
372
373     def __init__(self):
374         self._entries = {}
375         self._counter = llfuse.ROOT_INODE
376
377     def __getitem__(self, item):
378         return self._entries[item]
379
380     def __setitem__(self, key, item):
381         self._entries[key] = item
382
383     def __iter__(self):
384         return self._entries.iterkeys()
385
386     def items(self):
387         return self._entries.items()
388
389     def __contains__(self, k):
390         return k in self._entries
391
392     def add_entry(self, entry):
393         entry.inode = self._counter
394         self._entries[entry.inode] = entry
395         self._counter += 1
396         return entry
397
398     def del_entry(self, entry):
399         llfuse.invalidate_inode(entry.inode)
400         del self._entries[entry.inode]
401
402 class Operations(llfuse.Operations):
403     '''This is the main interface with llfuse.  The methods on this object are
404     called by llfuse threads to service FUSE events to query and read from
405     the file system.
406
407     llfuse has its own global lock which is acquired before calling a request handler,
408     so request handlers do not run concurrently unless the lock is explicitly released
409     with llfuse.lock_released.'''
410
411     def __init__(self, uid, gid):
412         super(Operations, self).__init__()
413
414         self.inodes = Inodes()
415         self.uid = uid
416         self.gid = gid
417
418         # dict of inode to filehandle
419         self._filehandles = {}
420         self._filehandles_counter = 1
421
422         # Other threads that need to wait until the fuse driver
423         # is fully initialized should wait() on this event object.
424         self.initlock = threading.Event()
425
426     def init(self):
427         # Allow threads that are waiting for the driver to be finished
428         # initializing to continue
429         self.initlock.set()
430
431     def access(self, inode, mode, ctx):
432         return True
433
434     def getattr(self, inode):
435         if inode not in self.inodes:
436             raise llfuse.FUSEError(errno.ENOENT)
437
438         e = self.inodes[inode]
439
440         entry = llfuse.EntryAttributes()
441         entry.st_ino = inode
442         entry.generation = 0
443         entry.entry_timeout = 300
444         entry.attr_timeout = 300
445
446         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
447         if isinstance(e, Directory):
448             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
449         else:
450             entry.st_mode |= stat.S_IFREG
451
452         entry.st_nlink = 1
453         entry.st_uid = self.uid
454         entry.st_gid = self.gid
455         entry.st_rdev = 0
456
457         entry.st_size = e.size()
458
459         entry.st_blksize = 1024
460         entry.st_blocks = e.size()/1024
461         if e.size()/1024 != 0:
462             entry.st_blocks += 1
463         entry.st_atime = 0
464         entry.st_mtime = 0
465         entry.st_ctime = 0
466
467         return entry
468
469     def lookup(self, parent_inode, name):
470         #print "lookup: parent_inode", parent_inode, "name", name
471         inode = None
472
473         if name == '.':
474             inode = parent_inode
475         else:
476             if parent_inode in self.inodes:
477                 p = self.inodes[parent_inode]
478                 if name == '..':
479                     inode = p.parent_inode
480                 elif name in p:
481                     inode = p[name].inode
482
483         if inode != None:
484             return self.getattr(inode)
485         else:
486             raise llfuse.FUSEError(errno.ENOENT)
487
488     def open(self, inode, flags):
489         if inode in self.inodes:
490             p = self.inodes[inode]
491         else:
492             raise llfuse.FUSEError(errno.ENOENT)
493
494         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
495             raise llfuse.FUSEError(errno.EROFS)
496
497         if isinstance(p, Directory):
498             raise llfuse.FUSEError(errno.EISDIR)
499
500         fh = self._filehandles_counter
501         self._filehandles_counter += 1
502         self._filehandles[fh] = FileHandle(fh, p)
503         return fh
504
505     def read(self, fh, off, size):
506         #print "read", fh, off, size
507         if fh in self._filehandles:
508             handle = self._filehandles[fh]
509         else:
510             raise llfuse.FUSEError(errno.EBADF)
511
512         try:
513             with llfuse.lock_released:
514                 return handle.entry.readfrom(off, size)
515         except:
516             raise llfuse.FUSEError(errno.EIO)
517
518     def release(self, fh):
519         if fh in self._filehandles:
520             del self._filehandles[fh]
521
522     def opendir(self, inode):
523         #print "opendir: inode", inode
524
525         if inode in self.inodes:
526             p = self.inodes[inode]
527         else:
528             raise llfuse.FUSEError(errno.ENOENT)
529
530         if not isinstance(p, Directory):
531             raise llfuse.FUSEError(errno.ENOTDIR)
532
533         fh = self._filehandles_counter
534         self._filehandles_counter += 1
535         if p.parent_inode in self.inodes:
536             parent = self.inodes[p.parent_inode]
537         else:
538             raise llfuse.FUSEError(errno.EIO)
539
540         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
541         return fh
542
543     def readdir(self, fh, off):
544         #print "readdir: fh", fh, "off", off
545
546         if fh in self._filehandles:
547             handle = self._filehandles[fh]
548         else:
549             raise llfuse.FUSEError(errno.EBADF)
550
551         #print "handle.entry", handle.entry
552
553         e = off
554         while e < len(handle.entry):
555             if handle.entry[e][1].inode in self.inodes:
556                 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
557             e += 1
558
559     def releasedir(self, fh):
560         del self._filehandles[fh]
561
562     def statfs(self):
563         st = llfuse.StatvfsData()
564         st.f_bsize = 1024 * 1024
565         st.f_blocks = 0
566         st.f_files = 0
567
568         st.f_bfree = 0
569         st.f_bavail = 0
570
571         st.f_ffree = 0
572         st.f_favail = 0
573
574         st.f_frsize = 0
575         return st
576
577     # The llfuse documentation recommends only overloading functions that
578     # are actually implemented, as the default implementation will raise ENOSYS.
579     # However, there is a bug in the llfuse default implementation of create()
580     # "create() takes exactly 5 positional arguments (6 given)" which will crash
581     # arv-mount.
582     # The workaround is to implement it with the proper number of parameters,
583     # and then everything works out.
584     def create(self, p1, p2, p3, p4, p5):
585         raise llfuse.FUSEError(errno.EROFS)