Merge branch '3123-pipeline-report-job-submit-errors' closes #3123
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7
8 import llfuse
9 import errno
10 import stat
11 import threading
12 import arvados
13 import pprint
14 import arvados.events
15 import re
16 import apiclient
17 import json
18 import logging
19
20 from time import time
21 from llfuse import FUSEError
22
23 class FreshBase(object):
24     '''Base class for maintaining fresh/stale state to determine when to update.'''
25     def __init__(self):
26         self._stale = True
27         self._poll = False
28         self._last_update = time()
29         self._poll_time = 60
30
31     # Mark the value as stale
32     def invalidate(self):
33         self._stale = True
34
35     # Test if the entries dict is stale
36     def stale(self):
37         if self._stale:
38             return True
39         if self._poll:
40             return (self._last_update + self._poll_time) < time()
41         return False
42
43     def fresh(self):
44         self._stale = False
45         self._last_update = time()
46
47
48 class File(FreshBase):
49     '''Base for file objects.'''
50
51     def __init__(self, parent_inode):
52         super(File, self).__init__()
53         self.inode = None
54         self.parent_inode = parent_inode
55
56     def size(self):
57         return 0
58
59     def readfrom(self, off, size):
60         return ''
61
62
63 class StreamReaderFile(File):
64     '''Wraps a StreamFileReader as a file.'''
65
66     def __init__(self, parent_inode, reader):
67         super(StreamReaderFile, self).__init__(parent_inode)
68         self.reader = reader
69
70     def size(self):
71         return self.reader.size()
72
73     def readfrom(self, off, size):
74         return self.reader.readfrom(off, size)
75
76     def stale(self):
77         return False
78
79
80 class ObjectFile(File):
81     '''Wraps a dict as a serialized json object.'''
82
83     def __init__(self, parent_inode, contents):
84         super(ObjectFile, self).__init__(parent_inode)
85         self.contentsdict = contents
86         self.uuid = self.contentsdict['uuid']
87         self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
88
89     def size(self):
90         return len(self.contents)
91
92     def readfrom(self, off, size):
93         return self.contents[off:(off+size)]
94
95
96 class Directory(FreshBase):
97     '''Generic directory object, backed by a dict.
98     Consists of a set of entries with the key representing the filename
99     and the value referencing a File or Directory object.
100     '''
101
102     def __init__(self, parent_inode):
103         super(Directory, self).__init__()
104
105         '''parent_inode is the integer inode number'''
106         self.inode = None
107         if not isinstance(parent_inode, int):
108             raise Exception("parent_inode should be an int")
109         self.parent_inode = parent_inode
110         self._entries = {}
111
112     #  Overriden by subclasses to implement logic to update the entries dict
113     #  when the directory is stale
114     def update(self):
115         pass
116
117     # Only used when computing the size of the disk footprint of the directory
118     # (stub)
119     def size(self):
120         return 0
121
122     def checkupdate(self):
123         if self.stale():
124             try:
125                 self.update()
126             except apiclient.errors.HttpError as e:
127                 logging.debug(e)
128
129     def __getitem__(self, item):
130         self.checkupdate()
131         return self._entries[item]
132
133     def items(self):
134         self.checkupdate()
135         return self._entries.items()
136
137     def __iter__(self):
138         self.checkupdate()
139         return self._entries.iterkeys()
140
141     def __contains__(self, k):
142         self.checkupdate()
143         return k in self._entries
144
145     def merge(self, items, fn, same, new_entry):
146         '''Helper method for updating the contents of the directory.
147
148         items: array with new directory contents
149
150         fn: function to take an entry in 'items' and return the desired file or
151         directory name
152
153         same: function to compare an existing entry with an entry in the items
154         list to determine whether to keep the existing entry.
155
156         new_entry: function to create a new directory entry from array entry.
157         '''
158
159         oldentries = self._entries
160         self._entries = {}
161         for i in items:
162             n = fn(i)
163             if n in oldentries and same(oldentries[n], i):
164                 self._entries[n] = oldentries[n]
165                 del oldentries[n]
166             else:
167                 self._entries[n] = self.inodes.add_entry(new_entry(i))
168         for n in oldentries:
169             llfuse.invalidate_entry(self.inode, str(n))
170             self.inodes.del_entry(oldentries[n])
171         self.fresh()
172
173
174 class CollectionDirectory(Directory):
175     '''Represents the root of a directory tree holding a collection.'''
176
177     def __init__(self, parent_inode, inodes, collection_locator):
178         super(CollectionDirectory, self).__init__(parent_inode)
179         self.inodes = inodes
180         self.collection_locator = collection_locator
181
182     def same(self, i):
183         return i['uuid'] == self.collection_locator
184
185     def update(self):
186         try:
187             collection = arvados.CollectionReader(self.collection_locator)
188             for s in collection.all_streams():
189                 cwd = self
190                 for part in s.name().split('/'):
191                     if part != '' and part != '.':
192                         if part not in cwd._entries:
193                             cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
194                         cwd = cwd._entries[part]
195                 for k, v in s.files().items():
196                     cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
197             self.fresh()
198             return True
199         except Exception as detail:
200             logging.debug("arv-mount %s: error: %s" % (self.collection_locator,detail))
201             return False
202
203 class MagicDirectory(Directory):
204     '''A special directory that logically contains the set of all extant keep
205     locators.  When a file is referenced by lookup(), it is tested to see if it
206     is a valid keep locator to a manifest, and if so, loads the manifest
207     contents as a subdirectory of this directory with the locator as the
208     directory name.  Since querying a list of all extant keep locators is
209     impractical, only collections that have already been accessed are visible
210     to readdir().
211     '''
212
213     def __init__(self, parent_inode, inodes):
214         super(MagicDirectory, self).__init__(parent_inode)
215         self.inodes = inodes
216
217     def __contains__(self, k):
218         if k in self._entries:
219             return True
220         try:
221             e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, k))
222             if e.update():
223                 self._entries[k] = e
224                 return True
225             else:
226                 return False
227         except Exception as e:
228             logging.debug('arv-mount exception keep %s', e)
229             return False
230
231     def __getitem__(self, item):
232         if item in self:
233             return self._entries[item]
234         else:
235             raise KeyError("No collection with id " + item)
236
237 class TagsDirectory(Directory):
238     '''A special directory that contains as subdirectories all tags visible to the user.'''
239
240     def __init__(self, parent_inode, inodes, api, poll_time=60):
241         super(TagsDirectory, self).__init__(parent_inode)
242         self.inodes = inodes
243         self.api = api
244         try:
245             arvados.events.subscribe(self.api, [['object_uuid', 'is_a', 'arvados#link']], lambda ev: self.invalidate())
246         except:
247             self._poll = True
248             self._poll_time = poll_time
249
250     def invalidate(self):
251         with llfuse.lock:
252             super(TagsDirectory, self).invalidate()
253             for a in self._entries:
254                 self._entries[a].invalidate()
255
256     def update(self):
257         tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
258         if "items" in tags:
259             self.merge(tags['items'],
260                        lambda i: i['name'],
261                        lambda a, i: a.tag == i,
262                        lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
263
264 class TagDirectory(Directory):
265     '''A special directory that contains as subdirectories all collections visible
266     to the user that are tagged with a particular tag.
267     '''
268
269     def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
270         super(TagDirectory, self).__init__(parent_inode)
271         self.inodes = inodes
272         self.api = api
273         self.tag = tag
274         self._poll = poll
275         self._poll_time = poll_time
276
277     def update(self):
278         taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
279                                                ['name', '=', self.tag],
280                                                ['head_uuid', 'is_a', 'arvados#collection']],
281                                       select=['head_uuid']).execute()
282         self.merge(taggedcollections['items'],
283                    lambda i: i['head_uuid'],
284                    lambda a, i: a.collection_locator == i['head_uuid'],
285                    lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
286
287
288 class GroupsDirectory(Directory):
289     '''A special directory that contains as subdirectories all groups visible to the user.'''
290
291     def __init__(self, parent_inode, inodes, api, poll_time=60):
292         super(GroupsDirectory, self).__init__(parent_inode)
293         self.inodes = inodes
294         self.api = api
295         try:
296             arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
297         except:
298             self._poll = True
299             self._poll_time = poll_time
300
301     def invalidate(self):
302         with llfuse.lock:
303             super(GroupsDirectory, self).invalidate()
304             for a in self._entries:
305                 self._entries[a].invalidate()
306
307     def update(self):
308         groups = self.api.groups().list().execute()
309         self.merge(groups['items'],
310                    lambda i: i['uuid'],
311                    lambda a, i: a.uuid == i['uuid'],
312                    lambda i: GroupDirectory(self.inode, self.inodes, self.api, i, poll=self._poll, poll_time=self._poll_time))
313
314
315 class GroupDirectory(Directory):
316     '''A special directory that contains the contents of a group.'''
317
318     def __init__(self, parent_inode, inodes, api, uuid, poll=False, poll_time=60):
319         super(GroupDirectory, self).__init__(parent_inode)
320         self.inodes = inodes
321         self.api = api
322         self.uuid = uuid['uuid']
323         self._poll = poll
324         self._poll_time = poll_time
325
326     def invalidate(self):
327         with llfuse.lock:
328             super(GroupDirectory, self).invalidate()
329             for a in self._entries:
330                 self._entries[a].invalidate()
331
332     def createDirectory(self, i):
333         if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
334             return CollectionDirectory(self.inode, self.inodes, i['uuid'])
335         elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
336             return GroupDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
337         elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
338             return ObjectFile(self.parent_inode, i)
339         return None
340
341     def update(self):
342         contents = self.api.groups().contents(uuid=self.uuid, include_linked=True).execute()
343         links = {}
344         for a in contents['links']:
345             links[a['head_uuid']] = a['name']
346
347         def choose_name(i):
348             if i['uuid'] in links:
349                 return links[i['uuid']]
350             else:
351                 return i['uuid']
352
353         def same(a, i):
354             if isinstance(a, CollectionDirectory):
355                 return a.collection_locator == i['uuid']
356             elif isinstance(a, GroupDirectory):
357                 return a.uuid == i['uuid']
358             elif isinstance(a, ObjectFile):
359                 return a.uuid == i['uuid'] and not a.stale()
360             return False
361
362         self.merge(contents['items'],
363                    choose_name,
364                    same,
365                    self.createDirectory)
366
367
368 class FileHandle(object):
369     '''Connects a numeric file handle to a File or Directory object that has
370     been opened by the client.'''
371
372     def __init__(self, fh, entry):
373         self.fh = fh
374         self.entry = entry
375
376
377 class Inodes(object):
378     '''Manage the set of inodes.  This is the mapping from a numeric id
379     to a concrete File or Directory object'''
380
381     def __init__(self):
382         self._entries = {}
383         self._counter = llfuse.ROOT_INODE
384
385     def __getitem__(self, item):
386         return self._entries[item]
387
388     def __setitem__(self, key, item):
389         self._entries[key] = item
390
391     def __iter__(self):
392         return self._entries.iterkeys()
393
394     def items(self):
395         return self._entries.items()
396
397     def __contains__(self, k):
398         return k in self._entries
399
400     def add_entry(self, entry):
401         entry.inode = self._counter
402         self._entries[entry.inode] = entry
403         self._counter += 1
404         return entry
405
406     def del_entry(self, entry):
407         llfuse.invalidate_inode(entry.inode)
408         del self._entries[entry.inode]
409
410 class Operations(llfuse.Operations):
411     '''This is the main interface with llfuse.  The methods on this object are
412     called by llfuse threads to service FUSE events to query and read from
413     the file system.
414
415     llfuse has its own global lock which is acquired before calling a request handler,
416     so request handlers do not run concurrently unless the lock is explicitly released
417     with llfuse.lock_released.'''
418
419     def __init__(self, uid, gid, debug=False):
420         super(Operations, self).__init__()
421
422         if debug:
423             logging.basicConfig(level=logging.DEBUG)
424             logging.info("arv-mount debug enabled")
425
426         self.inodes = Inodes()
427         self.uid = uid
428         self.gid = gid
429
430         # dict of inode to filehandle
431         self._filehandles = {}
432         self._filehandles_counter = 1
433
434         # Other threads that need to wait until the fuse driver
435         # is fully initialized should wait() on this event object.
436         self.initlock = threading.Event()
437
438     def init(self):
439         # Allow threads that are waiting for the driver to be finished
440         # initializing to continue
441         self.initlock.set()
442
443     def access(self, inode, mode, ctx):
444         return True
445
446     def getattr(self, inode):
447         if inode not in self.inodes:
448             raise llfuse.FUSEError(errno.ENOENT)
449
450         e = self.inodes[inode]
451
452         entry = llfuse.EntryAttributes()
453         entry.st_ino = inode
454         entry.generation = 0
455         entry.entry_timeout = 300
456         entry.attr_timeout = 300
457
458         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
459         if isinstance(e, Directory):
460             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
461         else:
462             entry.st_mode |= stat.S_IFREG
463
464         entry.st_nlink = 1
465         entry.st_uid = self.uid
466         entry.st_gid = self.gid
467         entry.st_rdev = 0
468
469         entry.st_size = e.size()
470
471         entry.st_blksize = 1024
472         entry.st_blocks = e.size()/1024
473         if e.size()/1024 != 0:
474             entry.st_blocks += 1
475         entry.st_atime = 0
476         entry.st_mtime = 0
477         entry.st_ctime = 0
478
479         return entry
480
481     def lookup(self, parent_inode, name):
482         logging.debug("arv-mount lookup: parent_inode %i name %s", parent_inode, name)
483         inode = None
484
485         if name == '.':
486             inode = parent_inode
487         else:
488             if parent_inode in self.inodes:
489                 p = self.inodes[parent_inode]
490                 if name == '..':
491                     inode = p.parent_inode
492                 elif name in p:
493                     inode = p[name].inode
494
495         if inode != None:
496             return self.getattr(inode)
497         else:
498             raise llfuse.FUSEError(errno.ENOENT)
499
500     def open(self, inode, flags):
501         if inode in self.inodes:
502             p = self.inodes[inode]
503         else:
504             raise llfuse.FUSEError(errno.ENOENT)
505
506         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
507             raise llfuse.FUSEError(errno.EROFS)
508
509         if isinstance(p, Directory):
510             raise llfuse.FUSEError(errno.EISDIR)
511
512         fh = self._filehandles_counter
513         self._filehandles_counter += 1
514         self._filehandles[fh] = FileHandle(fh, p)
515         return fh
516
517     def read(self, fh, off, size):
518         logging.debug("arv-mount read %i %i %i", fh, off, size)
519         if fh in self._filehandles:
520             handle = self._filehandles[fh]
521         else:
522             raise llfuse.FUSEError(errno.EBADF)
523
524         try:
525             with llfuse.lock_released:
526                 return handle.entry.readfrom(off, size)
527         except:
528             raise llfuse.FUSEError(errno.EIO)
529
530     def release(self, fh):
531         if fh in self._filehandles:
532             del self._filehandles[fh]
533
534     def opendir(self, inode):
535         logging.debug("arv-mount opendir: inode %i", inode)
536
537         if inode in self.inodes:
538             p = self.inodes[inode]
539         else:
540             raise llfuse.FUSEError(errno.ENOENT)
541
542         if not isinstance(p, Directory):
543             raise llfuse.FUSEError(errno.ENOTDIR)
544
545         fh = self._filehandles_counter
546         self._filehandles_counter += 1
547         if p.parent_inode in self.inodes:
548             parent = self.inodes[p.parent_inode]
549         else:
550             raise llfuse.FUSEError(errno.EIO)
551
552         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
553         return fh
554
555     def readdir(self, fh, off):
556         logging.debug("arv-mount readdir: fh %i off %i", fh, off)
557
558         if fh in self._filehandles:
559             handle = self._filehandles[fh]
560         else:
561             raise llfuse.FUSEError(errno.EBADF)
562
563         logging.debug("arv-mount handle.entry %s", handle.entry)
564
565         e = off
566         while e < len(handle.entry):
567             if handle.entry[e][1].inode in self.inodes:
568                 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
569             e += 1
570
571     def releasedir(self, fh):
572         del self._filehandles[fh]
573
574     def statfs(self):
575         st = llfuse.StatvfsData()
576         st.f_bsize = 1024 * 1024
577         st.f_blocks = 0
578         st.f_files = 0
579
580         st.f_bfree = 0
581         st.f_bavail = 0
582
583         st.f_ffree = 0
584         st.f_favail = 0
585
586         st.f_frsize = 0
587         return st
588
589     # The llfuse documentation recommends only overloading functions that
590     # are actually implemented, as the default implementation will raise ENOSYS.
591     # However, there is a bug in the llfuse default implementation of create()
592     # "create() takes exactly 5 positional arguments (6 given)" which will crash
593     # arv-mount.
594     # The workaround is to implement it with the proper number of parameters,
595     # and then everything works out.
596     def create(self, p1, p2, p3, p4, p5):
597         raise llfuse.FUSEError(errno.EROFS)