Improved fresh/stale handling with base class, added property fuse inode cache
[arvados.git] / sdk / python / arvados / fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7
8 import llfuse
9 import errno
10 import stat
11 import threading
12 import arvados
13 import pprint
14 import arvados.events
15 import re
16 import apiclient
17 import json
18
19 from time import time
20 from llfuse import FUSEError
21
22 class FreshBase(object):
23     '''Base class for maintaining fresh/stale state to determine when to update.'''
24     def __init__(self):
25         self._stale = True
26         self._poll = False
27         self._last_update = time()
28         self._poll_time = 60
29
30     # Mark the value as stale
31     def invalidate(self):
32         self._stale = True
33
34     # Test if the entries dict is stale
35     def stale(self):
36         if self._stale:
37             return True
38         if self._poll:
39             return (self._last_update + self._poll_time) < time()
40         return False
41
42     def fresh(self):
43         self._stale = False
44         self._last_update = time()
45
46
47 class File(FreshBase):
48     '''Base for file objects.'''
49
50     def __init__(self, parent_inode):
51         super(File, self).__init__()
52         self.inode = None
53         self.parent_inode = parent_inode
54
55     def size(self):
56         return 0
57
58     def readfrom(self, off, size):
59         return ''
60
61
62 class StreamReaderFile(File):
63     '''Wraps a StreamFileReader as a file.'''
64
65     def __init__(self, parent_inode, reader):
66         super(StreamReaderFile, self).__init__(parent_inode)
67         self.reader = reader
68
69     def size(self):
70         return self.reader.size()
71
72     def readfrom(self, off, size):
73         return self.reader.readfrom(off, size)
74
75     def stale(self):
76         return False
77
78
79 class ObjectFile(File):
80     '''Wraps a dict as a serialized json object.'''
81
82     def __init__(self, parent_inode, contents):
83         super(ObjectFile, self).__init__(parent_inode)
84         self.contentsdict = contents
85         self.uuid = self.contentsdict['uuid']
86         self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
87
88     def size(self):
89         return len(self.contents)
90
91     def readfrom(self, off, size):
92         return self.contents[off:(off+size)]
93
94
95 class Directory(FreshBase):
96     '''Generic directory object, backed by a dict.
97     Consists of a set of entries with the key representing the filename
98     and the value referencing a File or Directory object.
99     '''
100
101     def __init__(self, parent_inode):
102         super(Directory, self).__init__()
103
104         '''parent_inode is the integer inode number'''
105         self.inode = None
106         if not isinstance(parent_inode, int):
107             raise Exception("parent_inode should be an int")
108         self.parent_inode = parent_inode
109         self._entries = {}
110
111     #  Overriden by subclasses to implement logic to update the entries dict
112     #  when the directory is stale
113     def update(self):
114         pass
115
116     # Only used when computing the size of the disk footprint of the directory
117     # (stub)
118     def size(self):
119         return 0
120
121     def checkupdate(self):
122         if self.stale():
123             try:
124                 self.update()
125             except apiclient.errors.HttpError as e:
126                 print e
127
128     def __getitem__(self, item):
129         self.checkupdate()
130         return self._entries[item]
131
132     def items(self):
133         self.checkupdate()
134         return self._entries.items()
135
136     def __iter__(self):
137         self.checkupdate()
138         return self._entries.iterkeys()
139
140     def __contains__(self, k):
141         self.checkupdate()
142         return k in self._entries
143
144     def merge(self, items, fn, same, new_entry):
145         '''Helper method for updating the contents of the directory.
146
147         items: array with new directory contents
148
149         fn: function to take an entry in 'items' and return the desired file or
150         directory name
151
152         same: function to compare an existing entry with an entry in the items
153         list to determine whether to keep the existing entry.
154
155         new_entry: function to create a new directory entry from array entry.
156         '''
157
158         oldentries = self._entries
159         self._entries = {}
160         for i in items:
161             n = fn(i)
162             if n in oldentries and same(oldentries[n], i):
163                 self._entries[n] = oldentries[n]
164                 del oldentries[n]
165             else:
166                 self._entries[n] = self.inodes.add_entry(new_entry(i))
167         for n in oldentries:
168             llfuse.invalidate_entry(self.inode, str(n))
169             self.inodes.del_entry(oldentries[n])
170         self.fresh()
171
172
173 class CollectionDirectory(Directory):
174     '''Represents the root of a directory tree holding a collection.'''
175
176     def __init__(self, parent_inode, inodes, collection_locator):
177         super(CollectionDirectory, self).__init__(parent_inode)
178         self.inodes = inodes
179         self.collection_locator = collection_locator
180
181     def same(self, i):
182         return i['uuid'] == self.collection_locator
183
184     def update(self):
185         collection = arvados.CollectionReader(arvados.Keep.get(self.collection_locator))
186         for s in collection.all_streams():
187             cwd = self
188             for part in s.name().split('/'):
189                 if part != '' and part != '.':
190                     if part not in cwd._entries:
191                         cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
192                     cwd = cwd._entries[part]
193             for k, v in s.files().items():
194                 cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
195         self.fresh()
196
197
198 class MagicDirectory(Directory):
199     '''A special directory that logically contains the set of all extant keep
200     locators.  When a file is referenced by lookup(), it is tested to see if it
201     is a valid keep locator to a manifest, and if so, loads the manifest
202     contents as a subdirectory of this directory with the locator as the
203     directory name.  Since querying a list of all extant keep locators is
204     impractical, only collections that have already been accessed are visible
205     to readdir().
206     '''
207
208     def __init__(self, parent_inode, inodes):
209         super(MagicDirectory, self).__init__(parent_inode)
210         self.inodes = inodes
211
212     def __contains__(self, k):
213         if k in self._entries:
214             return True
215         try:
216             if arvados.Keep.get(k):
217                 return True
218             else:
219                 return False
220         except Exception as e:
221             #print 'exception keep', e
222             return False
223
224     def __getitem__(self, item):
225         if item not in self._entries:
226             self._entries[item] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, item))
227         return self._entries[item]
228
229
230 class TagsDirectory(Directory):
231     '''A special directory that contains as subdirectories all tags visible to the user.'''
232
233     def __init__(self, parent_inode, inodes, api, poll_time=60):
234         super(TagsDirectory, self).__init__(parent_inode)
235         self.inodes = inodes
236         self.api = api
237         try:
238             arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
239         except:
240             self._poll = True
241             self._poll_time = poll_time
242
243     def invalidate(self):
244         with llfuse.lock:
245             super(TagsDirectory, self).invalidate()
246             for a in self._entries:
247                 self._entries[a].invalidate()
248
249     def update(self):
250         tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
251         self.merge(tags['items'],
252                    lambda i: i['name'],
253                    lambda a, i: a.tag == i,
254                    lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
255
256 class TagDirectory(Directory):
257     '''A special directory that contains as subdirectories all collections visible
258     to the user that are tagged with a particular tag.
259     '''
260
261     def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
262         super(TagDirectory, self).__init__(parent_inode)
263         self.inodes = inodes
264         self.api = api
265         self.tag = tag
266         self._poll = poll
267         self._poll_time = poll_time
268
269     def update(self):
270         taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
271                                                ['name', '=', self.tag],
272                                                ['head_uuid', 'is_a', 'arvados#collection']],
273                                       select=['head_uuid']).execute()
274         self.merge(taggedcollections['items'],
275                    lambda i: i['head_uuid'],
276                    lambda a, i: a.collection_locator == i['head_uuid'],
277                    lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
278
279
280 class GroupsDirectory(Directory):
281     '''A special directory that contains as subdirectories all groups visible to the user.'''
282
283     def __init__(self, parent_inode, inodes, api, poll_time=60):
284         super(GroupsDirectory, self).__init__(parent_inode)
285         self.inodes = inodes
286         self.api = api
287         try:
288             arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
289         except:
290             self._poll = True
291             self._poll_time = poll_time
292
293     def invalidate(self):
294         with llfuse.lock:
295             super(GroupsDirectory, self).invalidate()
296             for a in self._entries:
297                 self._entries[a].invalidate()
298
299     def update(self):
300         groups = self.api.groups().list().execute()
301         self.merge(groups['items'],
302                    lambda i: i['uuid'],
303                    lambda a, i: a.uuid == i['uuid'],
304                    lambda i: GroupDirectory(self.inode, self.inodes, self.api, i, poll=self._poll, poll_time=self._poll_time))
305
306
307 class GroupDirectory(Directory):
308     '''A special directory that contains the contents of a group.'''
309
310     def __init__(self, parent_inode, inodes, api, uuid, poll=False, poll_time=60):
311         super(GroupDirectory, self).__init__(parent_inode)
312         self.inodes = inodes
313         self.api = api
314         self.uuid = uuid['uuid']
315         self._poll = poll
316         self._poll_time = poll_time
317
318     def invalidate(self):
319         with llfuse.lock:
320             super(GroupDirectory, self).invalidate()
321             for a in self._entries:
322                 self._entries[a].invalidate()
323
324     def createDirectory(self, i):
325         if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
326             return CollectionDirectory(self.inode, self.inodes, i['uuid'])
327         elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
328             return GroupDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
329         elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
330             return ObjectFile(self.parent_inode, i)
331         return None
332
333     def update(self):
334         contents = self.api.groups().contents(uuid=self.uuid, include_linked=True).execute()
335         links = {}
336         for a in contents['links']:
337             links[a['head_uuid']] = a['name']
338
339         def choose_name(i):
340             if i['uuid'] in links:
341                 return links[i['uuid']]
342             else:
343                 return i['uuid']
344
345         def same(a, i):
346             if isinstance(a, CollectionDirectory):
347                 return a.collection_locator == i['uuid']
348             elif isinstance(a, ObjectFile):
349                 return a.uuid == i['uuid'] and not a.stale()
350             return False
351
352         self.merge(contents['items'],
353                    choose_name,
354                    same,
355                    self.createDirectory)
356
357
358 class FileHandle(object):
359     '''Connects a numeric file handle to a File or Directory object that has
360     been opened by the client.'''
361
362     def __init__(self, fh, entry):
363         self.fh = fh
364         self.entry = entry
365
366
367 class Inodes(object):
368     '''Manage the set of inodes.  This is the mapping from a numeric id
369     to a concrete File or Directory object'''
370
371     def __init__(self):
372         self._entries = {}
373         self._counter = llfuse.ROOT_INODE
374
375     def __getitem__(self, item):
376         return self._entries[item]
377
378     def __setitem__(self, key, item):
379         self._entries[key] = item
380
381     def __iter__(self):
382         return self._entries.iterkeys()
383
384     def items(self):
385         return self._entries.items()
386
387     def __contains__(self, k):
388         return k in self._entries
389
390     def add_entry(self, entry):
391         entry.inode = self._counter
392         self._entries[entry.inode] = entry
393         self._counter += 1
394         return entry
395
396     def del_entry(self, entry):
397         llfuse.invalidate_inode(entry.inode)
398         del self._entries[entry.inode]
399
400 class Operations(llfuse.Operations):
401     '''This is the main interface with llfuse.  The methods on this object are
402     called by llfuse threads to service FUSE events to query and read from
403     the file system.
404
405     llfuse has its own global lock which is acquired before calling a request handler,
406     so request handlers do not run concurrently unless the lock is explicitly released
407     with llfuse.lock_released.'''
408
409     def __init__(self, uid, gid):
410         super(Operations, self).__init__()
411
412         self.inodes = Inodes()
413         self.uid = uid
414         self.gid = gid
415
416         # dict of inode to filehandle
417         self._filehandles = {}
418         self._filehandles_counter = 1
419
420         # Other threads that need to wait until the fuse driver
421         # is fully initialized should wait() on this event object.
422         self.initlock = threading.Event()
423
424     def init(self):
425         # Allow threads that are waiting for the driver to be finished
426         # initializing to continue
427         self.initlock.set()
428
429     def access(self, inode, mode, ctx):
430         return True
431
432     def getattr(self, inode):
433         if inode not in self.inodes:
434             raise llfuse.FUSEError(errno.ENOENT)
435
436         e = self.inodes[inode]
437
438         entry = llfuse.EntryAttributes()
439         entry.st_ino = inode
440         entry.generation = 0
441         entry.entry_timeout = 300
442         entry.attr_timeout = 300
443
444         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
445         if isinstance(e, Directory):
446             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
447         else:
448             entry.st_mode |= stat.S_IFREG
449
450         entry.st_nlink = 1
451         entry.st_uid = self.uid
452         entry.st_gid = self.gid
453         entry.st_rdev = 0
454
455         entry.st_size = e.size()
456
457         entry.st_blksize = 1024
458         entry.st_blocks = e.size()/1024
459         if e.size()/1024 != 0:
460             entry.st_blocks += 1
461         entry.st_atime = 0
462         entry.st_mtime = 0
463         entry.st_ctime = 0
464
465         return entry
466
467     def lookup(self, parent_inode, name):
468         #print "lookup: parent_inode", parent_inode, "name", name
469         inode = None
470
471         if name == '.':
472             inode = parent_inode
473         else:
474             if parent_inode in self.inodes:
475                 p = self.inodes[parent_inode]
476                 if name == '..':
477                     inode = p.parent_inode
478                 elif name in p:
479                     inode = p[name].inode
480
481         if inode != None:
482             return self.getattr(inode)
483         else:
484             raise llfuse.FUSEError(errno.ENOENT)
485
486     def open(self, inode, flags):
487         if inode in self.inodes:
488             p = self.inodes[inode]
489         else:
490             raise llfuse.FUSEError(errno.ENOENT)
491
492         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
493             raise llfuse.FUSEError(errno.EROFS)
494
495         if isinstance(p, Directory):
496             raise llfuse.FUSEError(errno.EISDIR)
497
498         fh = self._filehandles_counter
499         self._filehandles_counter += 1
500         self._filehandles[fh] = FileHandle(fh, p)
501         return fh
502
503     def read(self, fh, off, size):
504         #print "read", fh, off, size
505         if fh in self._filehandles:
506             handle = self._filehandles[fh]
507         else:
508             raise llfuse.FUSEError(errno.EBADF)
509
510         try:
511             with llfuse.lock_released:
512                 return handle.entry.readfrom(off, size)
513         except:
514             raise llfuse.FUSEError(errno.EIO)
515
516     def release(self, fh):
517         if fh in self._filehandles:
518             del self._filehandles[fh]
519
520     def opendir(self, inode):
521         #print "opendir: inode", inode
522
523         if inode in self.inodes:
524             p = self.inodes[inode]
525         else:
526             raise llfuse.FUSEError(errno.ENOENT)
527
528         if not isinstance(p, Directory):
529             raise llfuse.FUSEError(errno.ENOTDIR)
530
531         fh = self._filehandles_counter
532         self._filehandles_counter += 1
533         if p.parent_inode in self.inodes:
534             parent = self.inodes[p.parent_inode]
535         else:
536             raise llfuse.FUSEError(errno.EIO)
537
538         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
539         return fh
540
541     def readdir(self, fh, off):
542         #print "readdir: fh", fh, "off", off
543
544         if fh in self._filehandles:
545             handle = self._filehandles[fh]
546         else:
547             raise llfuse.FUSEError(errno.EBADF)
548
549         #print "handle.entry", handle.entry
550
551         e = off
552         while e < len(handle.entry):
553             if handle.entry[e][1].inode in self.inodes:
554                 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
555             e += 1
556
557     def releasedir(self, fh):
558         del self._filehandles[fh]
559
560     def statfs(self):
561         st = llfuse.StatvfsData()
562         st.f_bsize = 1024 * 1024
563         st.f_blocks = 0
564         st.f_files = 0
565
566         st.f_bfree = 0
567         st.f_bavail = 0
568
569         st.f_ffree = 0
570         st.f_favail = 0
571
572         st.f_frsize = 0
573         return st
574
575     # The llfuse documentation recommends only overloading functions that
576     # are actually implemented, as the default implementation will raise ENOSYS.
577     # However, there is a bug in the llfuse default implementation of create()
578     # "create() takes exactly 5 positional arguments (6 given)" which will crash
579     # arv-mount.
580     # The workaround is to implement it with the proper number of parameters,
581     # and then everything works out.
582     def create(self, p1, p2, p3, p4, p5):
583         raise llfuse.FUSEError(errno.EROFS)