2755: Defer to CollectionReader to get manifests instead of going
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7
8 import llfuse
9 import errno
10 import stat
11 import threading
12 import arvados
13 import pprint
14 import arvados.events
15 import re
16 import apiclient
17 import json
18
19 from time import time
20 from llfuse import FUSEError
21
22 class FreshBase(object):
23     '''Base class for maintaining fresh/stale state to determine when to update.'''
24     def __init__(self):
25         self._stale = True
26         self._poll = False
27         self._last_update = time()
28         self._poll_time = 60
29
30     # Mark the value as stale
31     def invalidate(self):
32         self._stale = True
33
34     # Test if the entries dict is stale
35     def stale(self):
36         if self._stale:
37             return True
38         if self._poll:
39             return (self._last_update + self._poll_time) < time()
40         return False
41
42     def fresh(self):
43         self._stale = False
44         self._last_update = time()
45
46
47 class File(FreshBase):
48     '''Base for file objects.'''
49
50     def __init__(self, parent_inode):
51         super(File, self).__init__()
52         self.inode = None
53         self.parent_inode = parent_inode
54
55     def size(self):
56         return 0
57
58     def readfrom(self, off, size):
59         return ''
60
61
62 class StreamReaderFile(File):
63     '''Wraps a StreamFileReader as a file.'''
64
65     def __init__(self, parent_inode, reader):
66         super(StreamReaderFile, self).__init__(parent_inode)
67         self.reader = reader
68
69     def size(self):
70         return self.reader.size()
71
72     def readfrom(self, off, size):
73         return self.reader.readfrom(off, size)
74
75     def stale(self):
76         return False
77
78
79 class ObjectFile(File):
80     '''Wraps a dict as a serialized json object.'''
81
82     def __init__(self, parent_inode, contents):
83         super(ObjectFile, self).__init__(parent_inode)
84         self.contentsdict = contents
85         self.uuid = self.contentsdict['uuid']
86         self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
87
88     def size(self):
89         return len(self.contents)
90
91     def readfrom(self, off, size):
92         return self.contents[off:(off+size)]
93
94
95 class Directory(FreshBase):
96     '''Generic directory object, backed by a dict.
97     Consists of a set of entries with the key representing the filename
98     and the value referencing a File or Directory object.
99     '''
100
101     def __init__(self, parent_inode):
102         super(Directory, self).__init__()
103
104         '''parent_inode is the integer inode number'''
105         self.inode = None
106         if not isinstance(parent_inode, int):
107             raise Exception("parent_inode should be an int")
108         self.parent_inode = parent_inode
109         self._entries = {}
110
111     #  Overriden by subclasses to implement logic to update the entries dict
112     #  when the directory is stale
113     def update(self):
114         pass
115
116     # Only used when computing the size of the disk footprint of the directory
117     # (stub)
118     def size(self):
119         return 0
120
121     def checkupdate(self):
122         if self.stale():
123             try:
124                 self.update()
125             except apiclient.errors.HttpError as e:
126                 print e
127
128     def __getitem__(self, item):
129         self.checkupdate()
130         return self._entries[item]
131
132     def items(self):
133         self.checkupdate()
134         return self._entries.items()
135
136     def __iter__(self):
137         self.checkupdate()
138         return self._entries.iterkeys()
139
140     def __contains__(self, k):
141         self.checkupdate()
142         return k in self._entries
143
144     def merge(self, items, fn, same, new_entry):
145         '''Helper method for updating the contents of the directory.
146
147         items: array with new directory contents
148
149         fn: function to take an entry in 'items' and return the desired file or
150         directory name
151
152         same: function to compare an existing entry with an entry in the items
153         list to determine whether to keep the existing entry.
154
155         new_entry: function to create a new directory entry from array entry.
156         '''
157
158         oldentries = self._entries
159         self._entries = {}
160         for i in items:
161             n = fn(i)
162             if n in oldentries and same(oldentries[n], i):
163                 self._entries[n] = oldentries[n]
164                 del oldentries[n]
165             else:
166                 self._entries[n] = self.inodes.add_entry(new_entry(i))
167         for n in oldentries:
168             llfuse.invalidate_entry(self.inode, str(n))
169             self.inodes.del_entry(oldentries[n])
170         self.fresh()
171
172
173 class CollectionDirectory(Directory):
174     '''Represents the root of a directory tree holding a collection.'''
175
176     def __init__(self, parent_inode, inodes, collection_locator):
177         super(CollectionDirectory, self).__init__(parent_inode)
178         self.inodes = inodes
179         self.collection_locator = collection_locator
180
181     def same(self, i):
182         return i['uuid'] == self.collection_locator
183
184     def update(self):
185         try:
186             collection = arvados.CollectionReader(self.collection_locator)
187             for s in collection.all_streams():
188                 cwd = self
189                 for part in s.name().split('/'):
190                     if part != '' and part != '.':
191                         if part not in cwd._entries:
192                             cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
193                         cwd = cwd._entries[part]
194                 for k, v in s.files().items():
195                     cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
196             print "found"
197             self.fresh()
198         except Exception as detail:
199             print("%s: error: %s" % (self.collection_locator,detail) )
200
201 class MagicDirectory(Directory):
202     '''A special directory that logically contains the set of all extant keep
203     locators.  When a file is referenced by lookup(), it is tested to see if it
204     is a valid keep locator to a manifest, and if so, loads the manifest
205     contents as a subdirectory of this directory with the locator as the
206     directory name.  Since querying a list of all extant keep locators is
207     impractical, only collections that have already been accessed are visible
208     to readdir().
209     '''
210
211     def __init__(self, parent_inode, inodes):
212         super(MagicDirectory, self).__init__(parent_inode)
213         self.inodes = inodes
214
215     def __contains__(self, k):
216         if k in self._entries:
217             return True
218         try:
219             if arvados.Keep.get(k):
220                 return True
221             else:
222                 return False
223         except Exception as e:
224             #print 'exception keep', e
225             return False
226
227     def __getitem__(self, item):
228         if item not in self._entries:
229             self._entries[item] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, item))
230         return self._entries[item]
231
232
233 class TagsDirectory(Directory):
234     '''A special directory that contains as subdirectories all tags visible to the user.'''
235
236     def __init__(self, parent_inode, inodes, api, poll_time=60):
237         super(TagsDirectory, self).__init__(parent_inode)
238         self.inodes = inodes
239         self.api = api
240         try:
241             arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
242         except:
243             self._poll = True
244             self._poll_time = poll_time
245
246     def invalidate(self):
247         with llfuse.lock:
248             super(TagsDirectory, self).invalidate()
249             for a in self._entries:
250                 self._entries[a].invalidate()
251
252     def update(self):
253         tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
254         self.merge(tags['items'],
255                    lambda i: i['name'],
256                    lambda a, i: a.tag == i,
257                    lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
258
259 class TagDirectory(Directory):
260     '''A special directory that contains as subdirectories all collections visible
261     to the user that are tagged with a particular tag.
262     '''
263
264     def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
265         super(TagDirectory, self).__init__(parent_inode)
266         self.inodes = inodes
267         self.api = api
268         self.tag = tag
269         self._poll = poll
270         self._poll_time = poll_time
271
272     def update(self):
273         taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
274                                                ['name', '=', self.tag],
275                                                ['head_uuid', 'is_a', 'arvados#collection']],
276                                       select=['head_uuid']).execute()
277         self.merge(taggedcollections['items'],
278                    lambda i: i['head_uuid'],
279                    lambda a, i: a.collection_locator == i['head_uuid'],
280                    lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
281
282
283 class GroupsDirectory(Directory):
284     '''A special directory that contains as subdirectories all groups visible to the user.'''
285
286     def __init__(self, parent_inode, inodes, api, poll_time=60):
287         super(GroupsDirectory, self).__init__(parent_inode)
288         self.inodes = inodes
289         self.api = api
290         try:
291             arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
292         except:
293             self._poll = True
294             self._poll_time = poll_time
295
296     def invalidate(self):
297         with llfuse.lock:
298             super(GroupsDirectory, self).invalidate()
299             for a in self._entries:
300                 self._entries[a].invalidate()
301
302     def update(self):
303         groups = self.api.groups().list().execute()
304         self.merge(groups['items'],
305                    lambda i: i['uuid'],
306                    lambda a, i: a.uuid == i['uuid'],
307                    lambda i: GroupDirectory(self.inode, self.inodes, self.api, i, poll=self._poll, poll_time=self._poll_time))
308
309
310 class GroupDirectory(Directory):
311     '''A special directory that contains the contents of a group.'''
312
313     def __init__(self, parent_inode, inodes, api, uuid, poll=False, poll_time=60):
314         super(GroupDirectory, self).__init__(parent_inode)
315         self.inodes = inodes
316         self.api = api
317         self.uuid = uuid['uuid']
318         self._poll = poll
319         self._poll_time = poll_time
320
321     def invalidate(self):
322         with llfuse.lock:
323             super(GroupDirectory, self).invalidate()
324             for a in self._entries:
325                 self._entries[a].invalidate()
326
327     def createDirectory(self, i):
328         if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
329             return CollectionDirectory(self.inode, self.inodes, i['uuid'])
330         elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
331             return GroupDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
332         elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
333             return ObjectFile(self.parent_inode, i)
334         return None
335
336     def update(self):
337         contents = self.api.groups().contents(uuid=self.uuid, include_linked=True).execute()
338         links = {}
339         for a in contents['links']:
340             links[a['head_uuid']] = a['name']
341
342         def choose_name(i):
343             if i['uuid'] in links:
344                 return links[i['uuid']]
345             else:
346                 return i['uuid']
347
348         def same(a, i):
349             if isinstance(a, CollectionDirectory):
350                 return a.collection_locator == i['uuid']
351             elif isinstance(a, GroupDirectory):
352                 return a.uuid == i['uuid']
353             elif isinstance(a, ObjectFile):
354                 return a.uuid == i['uuid'] and not a.stale()
355             return False
356
357         self.merge(contents['items'],
358                    choose_name,
359                    same,
360                    self.createDirectory)
361
362
363 class FileHandle(object):
364     '''Connects a numeric file handle to a File or Directory object that has
365     been opened by the client.'''
366
367     def __init__(self, fh, entry):
368         self.fh = fh
369         self.entry = entry
370
371
372 class Inodes(object):
373     '''Manage the set of inodes.  This is the mapping from a numeric id
374     to a concrete File or Directory object'''
375
376     def __init__(self):
377         self._entries = {}
378         self._counter = llfuse.ROOT_INODE
379
380     def __getitem__(self, item):
381         return self._entries[item]
382
383     def __setitem__(self, key, item):
384         self._entries[key] = item
385
386     def __iter__(self):
387         return self._entries.iterkeys()
388
389     def items(self):
390         return self._entries.items()
391
392     def __contains__(self, k):
393         return k in self._entries
394
395     def add_entry(self, entry):
396         entry.inode = self._counter
397         self._entries[entry.inode] = entry
398         self._counter += 1
399         return entry
400
401     def del_entry(self, entry):
402         llfuse.invalidate_inode(entry.inode)
403         del self._entries[entry.inode]
404
405 class Operations(llfuse.Operations):
406     '''This is the main interface with llfuse.  The methods on this object are
407     called by llfuse threads to service FUSE events to query and read from
408     the file system.
409
410     llfuse has its own global lock which is acquired before calling a request handler,
411     so request handlers do not run concurrently unless the lock is explicitly released
412     with llfuse.lock_released.'''
413
414     def __init__(self, uid, gid):
415         super(Operations, self).__init__()
416
417         self.inodes = Inodes()
418         self.uid = uid
419         self.gid = gid
420
421         # dict of inode to filehandle
422         self._filehandles = {}
423         self._filehandles_counter = 1
424
425         # Other threads that need to wait until the fuse driver
426         # is fully initialized should wait() on this event object.
427         self.initlock = threading.Event()
428
429     def init(self):
430         # Allow threads that are waiting for the driver to be finished
431         # initializing to continue
432         self.initlock.set()
433
434     def access(self, inode, mode, ctx):
435         return True
436
437     def getattr(self, inode):
438         if inode not in self.inodes:
439             raise llfuse.FUSEError(errno.ENOENT)
440
441         e = self.inodes[inode]
442
443         entry = llfuse.EntryAttributes()
444         entry.st_ino = inode
445         entry.generation = 0
446         entry.entry_timeout = 300
447         entry.attr_timeout = 300
448
449         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
450         if isinstance(e, Directory):
451             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
452         else:
453             entry.st_mode |= stat.S_IFREG
454
455         entry.st_nlink = 1
456         entry.st_uid = self.uid
457         entry.st_gid = self.gid
458         entry.st_rdev = 0
459
460         entry.st_size = e.size()
461
462         entry.st_blksize = 1024
463         entry.st_blocks = e.size()/1024
464         if e.size()/1024 != 0:
465             entry.st_blocks += 1
466         entry.st_atime = 0
467         entry.st_mtime = 0
468         entry.st_ctime = 0
469
470         return entry
471
472     def lookup(self, parent_inode, name):
473         #print "lookup: parent_inode", parent_inode, "name", name
474         inode = None
475
476         if name == '.':
477             inode = parent_inode
478         else:
479             if parent_inode in self.inodes:
480                 p = self.inodes[parent_inode]
481                 if name == '..':
482                     inode = p.parent_inode
483                 elif name in p:
484                     inode = p[name].inode
485
486         if inode != None:
487             return self.getattr(inode)
488         else:
489             raise llfuse.FUSEError(errno.ENOENT)
490
491     def open(self, inode, flags):
492         if inode in self.inodes:
493             p = self.inodes[inode]
494         else:
495             raise llfuse.FUSEError(errno.ENOENT)
496
497         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
498             raise llfuse.FUSEError(errno.EROFS)
499
500         if isinstance(p, Directory):
501             raise llfuse.FUSEError(errno.EISDIR)
502
503         fh = self._filehandles_counter
504         self._filehandles_counter += 1
505         self._filehandles[fh] = FileHandle(fh, p)
506         return fh
507
508     def read(self, fh, off, size):
509         #print "read", fh, off, size
510         if fh in self._filehandles:
511             handle = self._filehandles[fh]
512         else:
513             raise llfuse.FUSEError(errno.EBADF)
514
515         try:
516             with llfuse.lock_released:
517                 return handle.entry.readfrom(off, size)
518         except:
519             raise llfuse.FUSEError(errno.EIO)
520
521     def release(self, fh):
522         if fh in self._filehandles:
523             del self._filehandles[fh]
524
525     def opendir(self, inode):
526         #print "opendir: inode", inode
527
528         if inode in self.inodes:
529             p = self.inodes[inode]
530         else:
531             raise llfuse.FUSEError(errno.ENOENT)
532
533         if not isinstance(p, Directory):
534             raise llfuse.FUSEError(errno.ENOTDIR)
535
536         fh = self._filehandles_counter
537         self._filehandles_counter += 1
538         if p.parent_inode in self.inodes:
539             parent = self.inodes[p.parent_inode]
540         else:
541             raise llfuse.FUSEError(errno.EIO)
542
543         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
544         return fh
545
546     def readdir(self, fh, off):
547         #print "readdir: fh", fh, "off", off
548
549         if fh in self._filehandles:
550             handle = self._filehandles[fh]
551         else:
552             raise llfuse.FUSEError(errno.EBADF)
553
554         #print "handle.entry", handle.entry
555
556         e = off
557         while e < len(handle.entry):
558             if handle.entry[e][1].inode in self.inodes:
559                 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
560             e += 1
561
562     def releasedir(self, fh):
563         del self._filehandles[fh]
564
565     def statfs(self):
566         st = llfuse.StatvfsData()
567         st.f_bsize = 1024 * 1024
568         st.f_blocks = 0
569         st.f_files = 0
570
571         st.f_bfree = 0
572         st.f_bavail = 0
573
574         st.f_ffree = 0
575         st.f_favail = 0
576
577         st.f_frsize = 0
578         return st
579
580     # The llfuse documentation recommends only overloading functions that
581     # are actually implemented, as the default implementation will raise ENOSYS.
582     # However, there is a bug in the llfuse default implementation of create()
583     # "create() takes exactly 5 positional arguments (6 given)" which will crash
584     # arv-mount.
585     # The workaround is to implement it with the proper number of parameters,
586     # and then everything works out.
587     def create(self, p1, p2, p3, p4, p5):
588         raise llfuse.FUSEError(errno.EROFS)