Merge branch 'master' into 3505-virtual-work-dir
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7
8 import llfuse
9 import errno
10 import stat
11 import threading
12 import arvados
13 import pprint
14 import arvados.events
15 import re
16 import apiclient
17 import json
18 import logging
19
20 _logger = logging.getLogger('arvados.arvados_fuse')
21
22 from time import time
23 from llfuse import FUSEError
24
25 class FreshBase(object):
26     '''Base class for maintaining fresh/stale state to determine when to update.'''
27     def __init__(self):
28         self._stale = True
29         self._poll = False
30         self._last_update = time()
31         self._poll_time = 60
32
33     # Mark the value as stale
34     def invalidate(self):
35         self._stale = True
36
37     # Test if the entries dict is stale
38     def stale(self):
39         if self._stale:
40             return True
41         if self._poll:
42             return (self._last_update + self._poll_time) < time()
43         return False
44
45     def fresh(self):
46         self._stale = False
47         self._last_update = time()
48
49
50 class File(FreshBase):
51     '''Base for file objects.'''
52
53     def __init__(self, parent_inode):
54         super(File, self).__init__()
55         self.inode = None
56         self.parent_inode = parent_inode
57
58     def size(self):
59         return 0
60
61     def readfrom(self, off, size):
62         return ''
63
64
65 class StreamReaderFile(File):
66     '''Wraps a StreamFileReader as a file.'''
67
68     def __init__(self, parent_inode, reader):
69         super(StreamReaderFile, self).__init__(parent_inode)
70         self.reader = reader
71
72     def size(self):
73         return self.reader.size()
74
75     def readfrom(self, off, size):
76         return self.reader.readfrom(off, size)
77
78     def stale(self):
79         return False
80
81
82 class ObjectFile(File):
83     '''Wraps a dict as a serialized json object.'''
84
85     def __init__(self, parent_inode, contents):
86         super(ObjectFile, self).__init__(parent_inode)
87         self.contentsdict = contents
88         self.uuid = self.contentsdict['uuid']
89         self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
90
91     def size(self):
92         return len(self.contents)
93
94     def readfrom(self, off, size):
95         return self.contents[off:(off+size)]
96
97
98 class Directory(FreshBase):
99     '''Generic directory object, backed by a dict.
100     Consists of a set of entries with the key representing the filename
101     and the value referencing a File or Directory object.
102     '''
103
104     def __init__(self, parent_inode):
105         super(Directory, self).__init__()
106
107         '''parent_inode is the integer inode number'''
108         self.inode = None
109         if not isinstance(parent_inode, int):
110             raise Exception("parent_inode should be an int")
111         self.parent_inode = parent_inode
112         self._entries = {}
113
114     #  Overriden by subclasses to implement logic to update the entries dict
115     #  when the directory is stale
116     def update(self):
117         pass
118
119     # Only used when computing the size of the disk footprint of the directory
120     # (stub)
121     def size(self):
122         return 0
123
124     def checkupdate(self):
125         if self.stale():
126             try:
127                 self.update()
128             except apiclient.errors.HttpError as e:
129                 _logger.debug(e)
130
131     def __getitem__(self, item):
132         self.checkupdate()
133         return self._entries[item]
134
135     def items(self):
136         self.checkupdate()
137         return self._entries.items()
138
139     def __iter__(self):
140         self.checkupdate()
141         return self._entries.iterkeys()
142
143     def __contains__(self, k):
144         self.checkupdate()
145         return k in self._entries
146
147     def merge(self, items, fn, same, new_entry):
148         '''Helper method for updating the contents of the directory.
149
150         items: array with new directory contents
151
152         fn: function to take an entry in 'items' and return the desired file or
153         directory name
154
155         same: function to compare an existing entry with an entry in the items
156         list to determine whether to keep the existing entry.
157
158         new_entry: function to create a new directory entry from array entry.
159         '''
160
161         oldentries = self._entries
162         self._entries = {}
163         for i in items:
164             n = fn(i)
165             if n in oldentries and same(oldentries[n], i):
166                 self._entries[n] = oldentries[n]
167                 del oldentries[n]
168             else:
169                 self._entries[n] = self.inodes.add_entry(new_entry(i))
170         for n in oldentries:
171             llfuse.invalidate_entry(self.inode, str(n))
172             self.inodes.del_entry(oldentries[n])
173         self.fresh()
174
175
176 class CollectionDirectory(Directory):
177     '''Represents the root of a directory tree holding a collection.'''
178
179     def __init__(self, parent_inode, inodes, collection_locator):
180         super(CollectionDirectory, self).__init__(parent_inode)
181         self.inodes = inodes
182         self.collection_locator = collection_locator
183
184     def same(self, i):
185         return i['uuid'] == self.collection_locator
186
187     def update(self):
188         try:
189             collection = arvados.CollectionReader(self.collection_locator)
190             for s in collection.all_streams():
191                 cwd = self
192                 for part in s.name().split('/'):
193                     if part != '' and part != '.':
194                         if part not in cwd._entries:
195                             cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
196                         cwd = cwd._entries[part]
197                 for k, v in s.files().items():
198                     cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
199             self.fresh()
200             return True
201         except Exception as detail:
202             _logger.debug("arv-mount %s: error: %s",
203                           self.collection_locator, detail)
204             return False
205
206 class MagicDirectory(Directory):
207     '''A special directory that logically contains the set of all extant keep
208     locators.  When a file is referenced by lookup(), it is tested to see if it
209     is a valid keep locator to a manifest, and if so, loads the manifest
210     contents as a subdirectory of this directory with the locator as the
211     directory name.  Since querying a list of all extant keep locators is
212     impractical, only collections that have already been accessed are visible
213     to readdir().
214     '''
215
216     def __init__(self, parent_inode, inodes):
217         super(MagicDirectory, self).__init__(parent_inode)
218         self.inodes = inodes
219
220     def __contains__(self, k):
221         if k in self._entries:
222             return True
223         try:
224             e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, k))
225             if e.update():
226                 self._entries[k] = e
227                 return True
228             else:
229                 return False
230         except Exception as e:
231             _logger.debug('arv-mount exception keep %s', e)
232             return False
233
234     def __getitem__(self, item):
235         if item in self:
236             return self._entries[item]
237         else:
238             raise KeyError("No collection with id " + item)
239
240 class TagsDirectory(Directory):
241     '''A special directory that contains as subdirectories all tags visible to the user.'''
242
243     def __init__(self, parent_inode, inodes, api, poll_time=60):
244         super(TagsDirectory, self).__init__(parent_inode)
245         self.inodes = inodes
246         self.api = api
247         try:
248             arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
249         except:
250             self._poll = True
251             self._poll_time = poll_time
252
253     def invalidate(self):
254         with llfuse.lock:
255             super(TagsDirectory, self).invalidate()
256             for a in self._entries:
257                 self._entries[a].invalidate()
258
259     def update(self):
260         tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
261         if "items" in tags:
262             self.merge(tags['items'],
263                        lambda i: i['name'],
264                        lambda a, i: a.tag == i,
265                        lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
266
267 class TagDirectory(Directory):
268     '''A special directory that contains as subdirectories all collections visible
269     to the user that are tagged with a particular tag.
270     '''
271
272     def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
273         super(TagDirectory, self).__init__(parent_inode)
274         self.inodes = inodes
275         self.api = api
276         self.tag = tag
277         self._poll = poll
278         self._poll_time = poll_time
279
280     def update(self):
281         taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
282                                                ['name', '=', self.tag],
283                                                ['head_uuid', 'is_a', 'arvados#collection']],
284                                       select=['head_uuid']).execute()
285         self.merge(taggedcollections['items'],
286                    lambda i: i['head_uuid'],
287                    lambda a, i: a.collection_locator == i['head_uuid'],
288                    lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
289
290
291 class GroupsDirectory(Directory):
292     '''A special directory that contains as subdirectories all groups visible to the user.'''
293
294     def __init__(self, parent_inode, inodes, api, poll_time=60):
295         super(GroupsDirectory, self).__init__(parent_inode)
296         self.inodes = inodes
297         self.api = api
298         try:
299             arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
300         except:
301             self._poll = True
302             self._poll_time = poll_time
303
304     def invalidate(self):
305         with llfuse.lock:
306             super(GroupsDirectory, self).invalidate()
307             for a in self._entries:
308                 self._entries[a].invalidate()
309
310     def update(self):
311         groups = self.api.groups().list(
312             filters=[['group_class','=','project']]).execute()
313         self.merge(groups['items'],
314                    lambda i: i['uuid'],
315                    lambda a, i: a.uuid == i['uuid'],
316                    lambda i: GroupDirectory(self.inode, self.inodes, self.api, i, poll=self._poll, poll_time=self._poll_time))
317
318
319 class GroupDirectory(Directory):
320     '''A special directory that contains the contents of a group.'''
321
322     def __init__(self, parent_inode, inodes, api, uuid, poll=False, poll_time=60):
323         super(GroupDirectory, self).__init__(parent_inode)
324         self.inodes = inodes
325         self.api = api
326         self.uuid = uuid['uuid']
327         self._poll = poll
328         self._poll_time = poll_time
329
330     def invalidate(self):
331         with llfuse.lock:
332             super(GroupDirectory, self).invalidate()
333             for a in self._entries:
334                 self._entries[a].invalidate()
335
336     def createDirectory(self, i):
337         if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
338             return CollectionDirectory(self.inode, self.inodes, i['uuid'])
339         elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
340             return GroupDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
341         elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
342             return ObjectFile(self.parent_inode, i)
343         return None
344
345     def update(self):
346         contents = self.api.groups().contents(uuid=self.uuid, include_linked=True).execute()
347         links = {}
348         for a in contents['links']:
349             links[a['head_uuid']] = a['name']
350
351         def choose_name(i):
352             if i['uuid'] in links:
353                 return links[i['uuid']]
354             else:
355                 return i['uuid']
356
357         def same(a, i):
358             if isinstance(a, CollectionDirectory):
359                 return a.collection_locator == i['uuid']
360             elif isinstance(a, GroupDirectory):
361                 return a.uuid == i['uuid']
362             elif isinstance(a, ObjectFile):
363                 return a.uuid == i['uuid'] and not a.stale()
364             return False
365
366         self.merge(contents['items'],
367                    choose_name,
368                    same,
369                    self.createDirectory)
370
371
372 class FileHandle(object):
373     '''Connects a numeric file handle to a File or Directory object that has
374     been opened by the client.'''
375
376     def __init__(self, fh, entry):
377         self.fh = fh
378         self.entry = entry
379
380
381 class Inodes(object):
382     '''Manage the set of inodes.  This is the mapping from a numeric id
383     to a concrete File or Directory object'''
384
385     def __init__(self):
386         self._entries = {}
387         self._counter = llfuse.ROOT_INODE
388
389     def __getitem__(self, item):
390         return self._entries[item]
391
392     def __setitem__(self, key, item):
393         self._entries[key] = item
394
395     def __iter__(self):
396         return self._entries.iterkeys()
397
398     def items(self):
399         return self._entries.items()
400
401     def __contains__(self, k):
402         return k in self._entries
403
404     def add_entry(self, entry):
405         entry.inode = self._counter
406         self._entries[entry.inode] = entry
407         self._counter += 1
408         return entry
409
410     def del_entry(self, entry):
411         llfuse.invalidate_inode(entry.inode)
412         del self._entries[entry.inode]
413
414 class Operations(llfuse.Operations):
415     '''This is the main interface with llfuse.  The methods on this object are
416     called by llfuse threads to service FUSE events to query and read from
417     the file system.
418
419     llfuse has its own global lock which is acquired before calling a request handler,
420     so request handlers do not run concurrently unless the lock is explicitly released
421     with llfuse.lock_released.'''
422
423     def __init__(self, uid, gid):
424         super(Operations, self).__init__()
425
426         self.inodes = Inodes()
427         self.uid = uid
428         self.gid = gid
429
430         # dict of inode to filehandle
431         self._filehandles = {}
432         self._filehandles_counter = 1
433
434         # Other threads that need to wait until the fuse driver
435         # is fully initialized should wait() on this event object.
436         self.initlock = threading.Event()
437
438     def init(self):
439         # Allow threads that are waiting for the driver to be finished
440         # initializing to continue
441         self.initlock.set()
442
443     def access(self, inode, mode, ctx):
444         return True
445
446     def getattr(self, inode):
447         if inode not in self.inodes:
448             raise llfuse.FUSEError(errno.ENOENT)
449
450         e = self.inodes[inode]
451
452         entry = llfuse.EntryAttributes()
453         entry.st_ino = inode
454         entry.generation = 0
455         entry.entry_timeout = 300
456         entry.attr_timeout = 300
457
458         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
459         if isinstance(e, Directory):
460             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
461         else:
462             entry.st_mode |= stat.S_IFREG
463
464         entry.st_nlink = 1
465         entry.st_uid = self.uid
466         entry.st_gid = self.gid
467         entry.st_rdev = 0
468
469         entry.st_size = e.size()
470
471         entry.st_blksize = 1024
472         entry.st_blocks = e.size()/1024
473         if e.size()/1024 != 0:
474             entry.st_blocks += 1
475         entry.st_atime = 0
476         entry.st_mtime = 0
477         entry.st_ctime = 0
478
479         return entry
480
481     def lookup(self, parent_inode, name):
482         _logger.debug("arv-mount lookup: parent_inode %i name %s",
483                       parent_inode, name)
484         inode = None
485
486         if name == '.':
487             inode = parent_inode
488         else:
489             if parent_inode in self.inodes:
490                 p = self.inodes[parent_inode]
491                 if name == '..':
492                     inode = p.parent_inode
493                 elif name in p:
494                     inode = p[name].inode
495
496         if inode != None:
497             return self.getattr(inode)
498         else:
499             raise llfuse.FUSEError(errno.ENOENT)
500
501     def open(self, inode, flags):
502         if inode in self.inodes:
503             p = self.inodes[inode]
504         else:
505             raise llfuse.FUSEError(errno.ENOENT)
506
507         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
508             raise llfuse.FUSEError(errno.EROFS)
509
510         if isinstance(p, Directory):
511             raise llfuse.FUSEError(errno.EISDIR)
512
513         fh = self._filehandles_counter
514         self._filehandles_counter += 1
515         self._filehandles[fh] = FileHandle(fh, p)
516         return fh
517
518     def read(self, fh, off, size):
519         _logger.debug("arv-mount read %i %i %i", fh, off, size)
520         if fh in self._filehandles:
521             handle = self._filehandles[fh]
522         else:
523             raise llfuse.FUSEError(errno.EBADF)
524
525         try:
526             with llfuse.lock_released:
527                 return handle.entry.readfrom(off, size)
528         except:
529             raise llfuse.FUSEError(errno.EIO)
530
531     def release(self, fh):
532         if fh in self._filehandles:
533             del self._filehandles[fh]
534
535     def opendir(self, inode):
536         _logger.debug("arv-mount opendir: inode %i", inode)
537
538         if inode in self.inodes:
539             p = self.inodes[inode]
540         else:
541             raise llfuse.FUSEError(errno.ENOENT)
542
543         if not isinstance(p, Directory):
544             raise llfuse.FUSEError(errno.ENOTDIR)
545
546         fh = self._filehandles_counter
547         self._filehandles_counter += 1
548         if p.parent_inode in self.inodes:
549             parent = self.inodes[p.parent_inode]
550         else:
551             raise llfuse.FUSEError(errno.EIO)
552
553         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
554         return fh
555
556     def readdir(self, fh, off):
557         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
558
559         if fh in self._filehandles:
560             handle = self._filehandles[fh]
561         else:
562             raise llfuse.FUSEError(errno.EBADF)
563
564         _logger.debug("arv-mount handle.entry %s", handle.entry)
565
566         e = off
567         while e < len(handle.entry):
568             if handle.entry[e][1].inode in self.inodes:
569                 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
570             e += 1
571
572     def releasedir(self, fh):
573         del self._filehandles[fh]
574
575     def statfs(self):
576         st = llfuse.StatvfsData()
577         st.f_bsize = 1024 * 1024
578         st.f_blocks = 0
579         st.f_files = 0
580
581         st.f_bfree = 0
582         st.f_bavail = 0
583
584         st.f_ffree = 0
585         st.f_favail = 0
586
587         st.f_frsize = 0
588         return st
589
590     # The llfuse documentation recommends only overloading functions that
591     # are actually implemented, as the default implementation will raise ENOSYS.
592     # However, there is a bug in the llfuse default implementation of create()
593     # "create() takes exactly 5 positional arguments (6 given)" which will crash
594     # arv-mount.
595     # The workaround is to implement it with the proper number of parameters,
596     # and then everything works out.
597     def create(self, p1, p2, p3, p4, p5):
598         raise llfuse.FUSEError(errno.EROFS)