Add 'apps/arv-web/' from commit 'f9732ad8460d013c2f28363655d0d1b91894dca5'
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
25
26 _logger = logging.getLogger('arvados.arvados_fuse')
27
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
32
33 class SafeApi(object):
34     '''Threadsafe wrapper for API object.  This stores and returns a different api
35     object per thread, because httplib2 which underlies apiclient is not
36     threadsafe.
37     '''
38
39     def __init__(self, config):
40         self.host = config.get('ARVADOS_API_HOST')
41         self.api_token = config.get('ARVADOS_API_TOKEN')
42         self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
43         self.local = threading.local()
44         self.block_cache = arvados.KeepBlockCache()
45
46     def localapi(self):
47         if 'api' not in self.local.__dict__:
48             self.local.api = arvados.api('v1', False, self.host,
49                                          self.api_token, self.insecure)
50         return self.local.api
51
52     def localkeep(self):
53         if 'keep' not in self.local.__dict__:
54             self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
55         return self.local.keep
56
57     def __getattr__(self, name):
58         # Proxy nonexistent attributes to the local API client.
59         try:
60             return getattr(self.localapi(), name)
61         except AttributeError:
62             return super(SafeApi, self).__getattr__(name)
63
64
65 def convertTime(t):
66     '''Parse Arvados timestamp to unix time.'''
67     try:
68         return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
69     except (TypeError, ValueError):
70         return 0
71
72 def sanitize_filename(dirty):
73     '''Replace disallowed filename characters with harmless "_".'''
74     if dirty is None:
75         return None
76     elif dirty == '':
77         return '_'
78     elif dirty == '.':
79         return '_'
80     elif dirty == '..':
81         return '__'
82     else:
83         return _disallowed_filename_characters.sub('_', dirty)
84
85
86 class FreshBase(object):
87     '''Base class for maintaining fresh/stale state to determine when to update.'''
88     def __init__(self):
89         self._stale = True
90         self._poll = False
91         self._last_update = time.time()
92         self._atime = time.time()
93         self._poll_time = 60
94
95     # Mark the value as stale
96     def invalidate(self):
97         self._stale = True
98
99     # Test if the entries dict is stale.
100     def stale(self):
101         if self._stale:
102             return True
103         if self._poll:
104             return (self._last_update + self._poll_time) < self._atime
105         return False
106
107     def fresh(self):
108         self._stale = False
109         self._last_update = time.time()
110
111     def atime(self):
112         return self._atime
113
114 class File(FreshBase):
115     '''Base for file objects.'''
116
117     def __init__(self, parent_inode, _mtime=0):
118         super(File, self).__init__()
119         self.inode = None
120         self.parent_inode = parent_inode
121         self._mtime = _mtime
122
123     def size(self):
124         return 0
125
126     def readfrom(self, off, size):
127         return ''
128
129     def mtime(self):
130         return self._mtime
131
132
133 class StreamReaderFile(File):
134     '''Wraps a StreamFileReader as a file.'''
135
136     def __init__(self, parent_inode, reader, _mtime):
137         super(StreamReaderFile, self).__init__(parent_inode, _mtime)
138         self.reader = reader
139
140     def size(self):
141         return self.reader.size()
142
143     def readfrom(self, off, size):
144         return self.reader.readfrom(off, size)
145
146     def stale(self):
147         return False
148
149
150 class StringFile(File):
151     '''Wrap a simple string as a file'''
152     def __init__(self, parent_inode, contents, _mtime):
153         super(StringFile, self).__init__(parent_inode, _mtime)
154         self.contents = contents
155
156     def size(self):
157         return len(self.contents)
158
159     def readfrom(self, off, size):
160         return self.contents[off:(off+size)]
161
162
163 class ObjectFile(StringFile):
164     '''Wrap a dict as a serialized json object.'''
165
166     def __init__(self, parent_inode, obj):
167         super(ObjectFile, self).__init__(parent_inode, "", 0)
168         self.uuid = obj['uuid']
169         self.update(obj)
170
171     def update(self, obj):
172         self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
173         self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
174
175
176 class Directory(FreshBase):
177     '''Generic directory object, backed by a dict.
178     Consists of a set of entries with the key representing the filename
179     and the value referencing a File or Directory object.
180     '''
181
182     def __init__(self, parent_inode):
183         super(Directory, self).__init__()
184
185         '''parent_inode is the integer inode number'''
186         self.inode = None
187         if not isinstance(parent_inode, int):
188             raise Exception("parent_inode should be an int")
189         self.parent_inode = parent_inode
190         self._entries = {}
191         self._mtime = time.time()
192
193     #  Overriden by subclasses to implement logic to update the entries dict
194     #  when the directory is stale
195     def update(self):
196         pass
197
198     # Only used when computing the size of the disk footprint of the directory
199     # (stub)
200     def size(self):
201         return 0
202
203     def checkupdate(self):
204         if self.stale():
205             try:
206                 self.update()
207             except apiclient.errors.HttpError as e:
208                 _logger.debug(e)
209
210     def __getitem__(self, item):
211         self.checkupdate()
212         return self._entries[item]
213
214     def items(self):
215         self.checkupdate()
216         return self._entries.items()
217
218     def __iter__(self):
219         self.checkupdate()
220         return self._entries.iterkeys()
221
222     def __contains__(self, k):
223         self.checkupdate()
224         return k in self._entries
225
226     def merge(self, items, fn, same, new_entry):
227         '''Helper method for updating the contents of the directory.  Takes a list
228         describing the new contents of the directory, reuse entries that are
229         the same in both the old and new lists, create new entries, and delete
230         old entries missing from the new list.
231
232         items: iterable with new directory contents
233
234         fn: function to take an entry in 'items' and return the desired file or
235         directory name, or None if this entry should be skipped
236
237         same: function to compare an existing entry (a File or Directory
238         object) with an entry in the items list to determine whether to keep
239         the existing entry.
240
241         new_entry: function to create a new directory entry (File or Directory
242         object) from an entry in the items list.
243
244         '''
245
246         oldentries = self._entries
247         self._entries = {}
248         changed = False
249         for i in items:
250             name = sanitize_filename(fn(i))
251             if name:
252                 if name in oldentries and same(oldentries[name], i):
253                     # move existing directory entry over
254                     self._entries[name] = oldentries[name]
255                     del oldentries[name]
256                 else:
257                     # create new directory entry
258                     ent = new_entry(i)
259                     if ent is not None:
260                         self._entries[name] = self.inodes.add_entry(ent)
261                         changed = True
262
263         # delete any other directory entries that were not in found in 'items'
264         for i in oldentries:
265             llfuse.invalidate_entry(self.inode, str(i))
266             self.inodes.del_entry(oldentries[i])
267             changed = True
268
269         if changed:
270             self._mtime = time.time()
271
272         self.fresh()
273
274     def clear(self):
275         '''Delete all entries'''
276         oldentries = self._entries
277         self._entries = {}
278         for n in oldentries:
279             if isinstance(n, Directory):
280                 n.clear()
281             llfuse.invalidate_entry(self.inode, str(n))
282             self.inodes.del_entry(oldentries[n])
283         self.invalidate()
284
285     def mtime(self):
286         return self._mtime
287
288
289 class CollectionDirectory(Directory):
290     '''Represents the root of a directory tree holding a collection.'''
291
292     def __init__(self, parent_inode, inodes, api, num_retries, collection):
293         super(CollectionDirectory, self).__init__(parent_inode)
294         self.inodes = inodes
295         self.api = api
296         self.num_retries = num_retries
297         self.collection_object_file = None
298         self.collection_object = None
299         if isinstance(collection, dict):
300             self.collection_locator = collection['uuid']
301         else:
302             self.collection_locator = collection
303
304     def same(self, i):
305         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
306
307     def new_collection(self, new_collection_object, coll_reader):
308         self.collection_object = new_collection_object
309
310         if self.collection_object_file is not None:
311             self.collection_object_file.update(self.collection_object)
312
313         self.clear()
314         for s in coll_reader.all_streams():
315             cwd = self
316             for part in s.name().split('/'):
317                 if part != '' and part != '.':
318                     partname = sanitize_filename(part)
319                     if partname not in cwd._entries:
320                         cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
321                     cwd = cwd._entries[partname]
322             for k, v in s.files().items():
323                 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
324
325     def update(self):
326         try:
327             if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
328                 return True
329
330             with llfuse.lock_released:
331                 coll_reader = arvados.CollectionReader(
332                     self.collection_locator, self.api, self.api.localkeep(),
333                     num_retries=self.num_retries)
334                 new_collection_object = coll_reader.api_response() or {}
335                 # If the Collection only exists in Keep, there will be no API
336                 # response.  Fill in the fields we need.
337                 if 'uuid' not in new_collection_object:
338                     new_collection_object['uuid'] = self.collection_locator
339                 if "portable_data_hash" not in new_collection_object:
340                     new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
341                 if 'manifest_text' not in new_collection_object:
342                     new_collection_object['manifest_text'] = coll_reader.manifest_text()
343                 coll_reader.normalize()
344             # end with llfuse.lock_released, re-acquire lock
345
346             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
347                 self.new_collection(new_collection_object, coll_reader)
348
349             self.fresh()
350             return True
351         except apiclient.errors.NotFoundError:
352             _logger.exception("arv-mount %s: error", self.collection_locator)
353         except arvados.errors.ArgumentError as detail:
354             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
355             if self.collection_object is not None and "manifest_text" in self.collection_object:
356                 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
357         except Exception:
358             _logger.exception("arv-mount %s: error", self.collection_locator)
359             if self.collection_object is not None and "manifest_text" in self.collection_object:
360                 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
361         return False
362
363     def __getitem__(self, item):
364         self.checkupdate()
365         if item == '.arvados#collection':
366             if self.collection_object_file is None:
367                 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
368                 self.inodes.add_entry(self.collection_object_file)
369             return self.collection_object_file
370         else:
371             return super(CollectionDirectory, self).__getitem__(item)
372
373     def __contains__(self, k):
374         if k == '.arvados#collection':
375             return True
376         else:
377             return super(CollectionDirectory, self).__contains__(k)
378
379     def mtime(self):
380         self.checkupdate()
381         return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
382
383
384 class MagicDirectory(Directory):
385     '''A special directory that logically contains the set of all extant keep
386     locators.  When a file is referenced by lookup(), it is tested to see if it
387     is a valid keep locator to a manifest, and if so, loads the manifest
388     contents as a subdirectory of this directory with the locator as the
389     directory name.  Since querying a list of all extant keep locators is
390     impractical, only collections that have already been accessed are visible
391     to readdir().
392     '''
393
394     README_TEXT = '''
395 This directory provides access to Arvados collections as subdirectories listed
396 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
397 the form '1234567890abcdefghijklmnopqrstuv+123').
398
399 Note that this directory will appear empty until you attempt to access a
400 specific collection subdirectory (such as trying to 'cd' into it), at which
401 point the collection will actually be looked up on the server and the directory
402 will appear if it exists.
403 '''.lstrip()
404
405     def __init__(self, parent_inode, inodes, api, num_retries):
406         super(MagicDirectory, self).__init__(parent_inode)
407         self.inodes = inodes
408         self.api = api
409         self.num_retries = num_retries
410
411     def __setattr__(self, name, value):
412         super(MagicDirectory, self).__setattr__(name, value)
413         # When we're assigned an inode, add a README.
414         if ((name == 'inode') and (self.inode is not None) and
415               (not self._entries)):
416             self._entries['README'] = self.inodes.add_entry(
417                 StringFile(self.inode, self.README_TEXT, time.time()))
418             # If we're the root directory, add an identical by_id subdirectory.
419             if self.inode == llfuse.ROOT_INODE:
420                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
421                         self.inode, self.inodes, self.api, self.num_retries))
422
423     def __contains__(self, k):
424         if k in self._entries:
425             return True
426
427         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
428             return False
429
430         try:
431             e = self.inodes.add_entry(CollectionDirectory(
432                     self.inode, self.inodes, self.api, self.num_retries, k))
433             if e.update():
434                 self._entries[k] = e
435                 return True
436             else:
437                 return False
438         except Exception as e:
439             _logger.debug('arv-mount exception keep %s', e)
440             return False
441
442     def __getitem__(self, item):
443         if item in self:
444             return self._entries[item]
445         else:
446             raise KeyError("No collection with id " + item)
447
448
449 class RecursiveInvalidateDirectory(Directory):
450     def invalidate(self):
451         if self.inode == llfuse.ROOT_INODE:
452             llfuse.lock.acquire()
453         try:
454             super(RecursiveInvalidateDirectory, self).invalidate()
455             for a in self._entries:
456                 self._entries[a].invalidate()
457         except Exception:
458             _logger.exception()
459         finally:
460             if self.inode == llfuse.ROOT_INODE:
461                 llfuse.lock.release()
462
463
464 class TagsDirectory(RecursiveInvalidateDirectory):
465     '''A special directory that contains as subdirectories all tags visible to the user.'''
466
467     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
468         super(TagsDirectory, self).__init__(parent_inode)
469         self.inodes = inodes
470         self.api = api
471         self.num_retries = num_retries
472         self._poll = True
473         self._poll_time = poll_time
474
475     def update(self):
476         with llfuse.lock_released:
477             tags = self.api.links().list(
478                 filters=[['link_class', '=', 'tag']],
479                 select=['name'], distinct=True
480                 ).execute(num_retries=self.num_retries)
481         if "items" in tags:
482             self.merge(tags['items'],
483                        lambda i: i['name'] if 'name' in i else i['uuid'],
484                        lambda a, i: a.tag == i,
485                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
486
487
488 class TagDirectory(Directory):
489     '''A special directory that contains as subdirectories all collections visible
490     to the user that are tagged with a particular tag.
491     '''
492
493     def __init__(self, parent_inode, inodes, api, num_retries, tag,
494                  poll=False, poll_time=60):
495         super(TagDirectory, self).__init__(parent_inode)
496         self.inodes = inodes
497         self.api = api
498         self.num_retries = num_retries
499         self.tag = tag
500         self._poll = poll
501         self._poll_time = poll_time
502
503     def update(self):
504         with llfuse.lock_released:
505             taggedcollections = self.api.links().list(
506                 filters=[['link_class', '=', 'tag'],
507                          ['name', '=', self.tag],
508                          ['head_uuid', 'is_a', 'arvados#collection']],
509                 select=['head_uuid']
510                 ).execute(num_retries=self.num_retries)
511         self.merge(taggedcollections['items'],
512                    lambda i: i['head_uuid'],
513                    lambda a, i: a.collection_locator == i['head_uuid'],
514                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
515
516
517 class ProjectDirectory(Directory):
518     '''A special directory that contains the contents of a project.'''
519
520     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
521                  poll=False, poll_time=60):
522         super(ProjectDirectory, self).__init__(parent_inode)
523         self.inodes = inodes
524         self.api = api
525         self.num_retries = num_retries
526         self.project_object = project_object
527         self.project_object_file = None
528         self.uuid = project_object['uuid']
529         self._poll = poll
530         self._poll_time = poll_time
531
532     def createDirectory(self, i):
533         if collection_uuid_pattern.match(i['uuid']):
534             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
535         elif group_uuid_pattern.match(i['uuid']):
536             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
537         elif link_uuid_pattern.match(i['uuid']):
538             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
539                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
540             else:
541                 return None
542         elif uuid_pattern.match(i['uuid']):
543             return ObjectFile(self.parent_inode, i)
544         else:
545             return None
546
547     def update(self):
548         if self.project_object_file == None:
549             self.project_object_file = ObjectFile(self.inode, self.project_object)
550             self.inodes.add_entry(self.project_object_file)
551
552         def namefn(i):
553             if 'name' in i:
554                 if i['name'] is None or len(i['name']) == 0:
555                     return None
556                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
557                     # collection or subproject
558                     return i['name']
559                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
560                     # name link
561                     return i['name']
562                 elif 'kind' in i and i['kind'].startswith('arvados#'):
563                     # something else
564                     return "{}.{}".format(i['name'], i['kind'][8:])
565             else:
566                 return None
567
568         def samefn(a, i):
569             if isinstance(a, CollectionDirectory):
570                 return a.collection_locator == i['uuid']
571             elif isinstance(a, ProjectDirectory):
572                 return a.uuid == i['uuid']
573             elif isinstance(a, ObjectFile):
574                 return a.uuid == i['uuid'] and not a.stale()
575             return False
576
577         with llfuse.lock_released:
578             if group_uuid_pattern.match(self.uuid):
579                 self.project_object = self.api.groups().get(
580                     uuid=self.uuid).execute(num_retries=self.num_retries)
581             elif user_uuid_pattern.match(self.uuid):
582                 self.project_object = self.api.users().get(
583                     uuid=self.uuid).execute(num_retries=self.num_retries)
584
585             contents = arvados.util.list_all(self.api.groups().contents,
586                                              self.num_retries, uuid=self.uuid)
587             # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
588             contents += arvados.util.list_all(
589                 self.api.links().list, self.num_retries,
590                 filters=[['tail_uuid', '=', self.uuid],
591                          ['link_class', '=', 'name']])
592
593         # end with llfuse.lock_released, re-acquire lock
594
595         self.merge(contents,
596                    namefn,
597                    samefn,
598                    self.createDirectory)
599
600     def __getitem__(self, item):
601         self.checkupdate()
602         if item == '.arvados#project':
603             return self.project_object_file
604         else:
605             return super(ProjectDirectory, self).__getitem__(item)
606
607     def __contains__(self, k):
608         if k == '.arvados#project':
609             return True
610         else:
611             return super(ProjectDirectory, self).__contains__(k)
612
613
614 class SharedDirectory(Directory):
615     '''A special directory that represents users or groups who have shared projects with me.'''
616
617     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
618                  poll=False, poll_time=60):
619         super(SharedDirectory, self).__init__(parent_inode)
620         self.inodes = inodes
621         self.api = api
622         self.num_retries = num_retries
623         self.current_user = api.users().current().execute(num_retries=num_retries)
624         self._poll = True
625         self._poll_time = poll_time
626
627     def update(self):
628         with llfuse.lock_released:
629             all_projects = arvados.util.list_all(
630                 self.api.groups().list, self.num_retries,
631                 filters=[['group_class','=','project']])
632             objects = {}
633             for ob in all_projects:
634                 objects[ob['uuid']] = ob
635
636             roots = []
637             root_owners = {}
638             for ob in all_projects:
639                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
640                     roots.append(ob)
641                     root_owners[ob['owner_uuid']] = True
642
643             lusers = arvados.util.list_all(
644                 self.api.users().list, self.num_retries,
645                 filters=[['uuid','in', list(root_owners)]])
646             lgroups = arvados.util.list_all(
647                 self.api.groups().list, self.num_retries,
648                 filters=[['uuid','in', list(root_owners)]])
649
650             users = {}
651             groups = {}
652
653             for l in lusers:
654                 objects[l["uuid"]] = l
655             for l in lgroups:
656                 objects[l["uuid"]] = l
657
658             contents = {}
659             for r in root_owners:
660                 if r in objects:
661                     obr = objects[r]
662                     if "name" in obr:
663                         contents[obr["name"]] = obr
664                     if "first_name" in obr:
665                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
666
667             for r in roots:
668                 if r['owner_uuid'] not in objects:
669                     contents[r['name']] = r
670
671         # end with llfuse.lock_released, re-acquire lock
672
673         try:
674             self.merge(contents.items(),
675                        lambda i: i[0],
676                        lambda a, i: a.uuid == i[1]['uuid'],
677                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
678         except Exception:
679             _logger.exception()
680
681
682 class FileHandle(object):
683     '''Connects a numeric file handle to a File or Directory object that has
684     been opened by the client.'''
685
686     def __init__(self, fh, entry):
687         self.fh = fh
688         self.entry = entry
689
690
691 class Inodes(object):
692     '''Manage the set of inodes.  This is the mapping from a numeric id
693     to a concrete File or Directory object'''
694
695     def __init__(self):
696         self._entries = {}
697         self._counter = itertools.count(llfuse.ROOT_INODE)
698
699     def __getitem__(self, item):
700         return self._entries[item]
701
702     def __setitem__(self, key, item):
703         self._entries[key] = item
704
705     def __iter__(self):
706         return self._entries.iterkeys()
707
708     def items(self):
709         return self._entries.items()
710
711     def __contains__(self, k):
712         return k in self._entries
713
714     def add_entry(self, entry):
715         entry.inode = next(self._counter)
716         self._entries[entry.inode] = entry
717         return entry
718
719     def del_entry(self, entry):
720         llfuse.invalidate_inode(entry.inode)
721         del self._entries[entry.inode]
722
723 class Operations(llfuse.Operations):
724     '''This is the main interface with llfuse.  The methods on this object are
725     called by llfuse threads to service FUSE events to query and read from
726     the file system.
727
728     llfuse has its own global lock which is acquired before calling a request handler,
729     so request handlers do not run concurrently unless the lock is explicitly released
730     using "with llfuse.lock_released:"'''
731
732     def __init__(self, uid, gid, encoding="utf-8", set_executable_bit=False):
733         super(Operations, self).__init__()
734
735         self.inodes = Inodes()
736         self.uid = uid
737         self.gid = gid
738         self.encoding = encoding
739         self.set_executable_bit = set_executable_bit
740
741         # dict of inode to filehandle
742         self._filehandles = {}
743         self._filehandles_counter = 1
744
745         # Other threads that need to wait until the fuse driver
746         # is fully initialized should wait() on this event object.
747         self.initlock = threading.Event()
748
749     def init(self):
750         # Allow threads that are waiting for the driver to be finished
751         # initializing to continue
752         self.initlock.set()
753
754     def access(self, inode, mode, ctx):
755         return True
756
757     def getattr(self, inode):
758         if inode not in self.inodes:
759             raise llfuse.FUSEError(errno.ENOENT)
760
761         e = self.inodes[inode]
762
763         entry = llfuse.EntryAttributes()
764         entry.st_ino = inode
765         entry.generation = 0
766         entry.entry_timeout = 300
767         entry.attr_timeout = 300
768
769         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
770         if isinstance(e, Directory):
771             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
772         elif isinstance(e, StreamReaderFile) and self.set_executable_bit:
773             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
774         else:
775             entry.st_mode |= stat.S_IFREG
776
777         entry.st_nlink = 1
778         entry.st_uid = self.uid
779         entry.st_gid = self.gid
780         entry.st_rdev = 0
781
782         entry.st_size = e.size()
783
784         entry.st_blksize = 512
785         entry.st_blocks = (e.size()/512)+1
786         entry.st_atime = int(e.atime())
787         entry.st_mtime = int(e.mtime())
788         entry.st_ctime = int(e.mtime())
789
790         return entry
791
792     def lookup(self, parent_inode, name):
793         name = unicode(name, self.encoding)
794         _logger.debug("arv-mount lookup: parent_inode %i name %s",
795                       parent_inode, name)
796         inode = None
797
798         if name == '.':
799             inode = parent_inode
800         else:
801             if parent_inode in self.inodes:
802                 p = self.inodes[parent_inode]
803                 if name == '..':
804                     inode = p.parent_inode
805                 elif isinstance(p, Directory) and name in p:
806                     inode = p[name].inode
807
808         if inode != None:
809             return self.getattr(inode)
810         else:
811             raise llfuse.FUSEError(errno.ENOENT)
812
813     def open(self, inode, flags):
814         if inode in self.inodes:
815             p = self.inodes[inode]
816         else:
817             raise llfuse.FUSEError(errno.ENOENT)
818
819         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
820             raise llfuse.FUSEError(errno.EROFS)
821
822         if isinstance(p, Directory):
823             raise llfuse.FUSEError(errno.EISDIR)
824
825         fh = self._filehandles_counter
826         self._filehandles_counter += 1
827         self._filehandles[fh] = FileHandle(fh, p)
828         return fh
829
830     def read(self, fh, off, size):
831         _logger.debug("arv-mount read %i %i %i", fh, off, size)
832         if fh in self._filehandles:
833             handle = self._filehandles[fh]
834         else:
835             raise llfuse.FUSEError(errno.EBADF)
836
837         # update atime
838         handle.entry._atime = time.time()
839
840         try:
841             with llfuse.lock_released:
842                 return handle.entry.readfrom(off, size)
843         except arvados.errors.NotFoundError as e:
844             _logger.warning("Block not found: " + str(e))
845             raise llfuse.FUSEError(errno.EIO)
846         except Exception:
847             _logger.exception()
848             raise llfuse.FUSEError(errno.EIO)
849
850     def release(self, fh):
851         if fh in self._filehandles:
852             del self._filehandles[fh]
853
854     def opendir(self, inode):
855         _logger.debug("arv-mount opendir: inode %i", inode)
856
857         if inode in self.inodes:
858             p = self.inodes[inode]
859         else:
860             raise llfuse.FUSEError(errno.ENOENT)
861
862         if not isinstance(p, Directory):
863             raise llfuse.FUSEError(errno.ENOTDIR)
864
865         fh = self._filehandles_counter
866         self._filehandles_counter += 1
867         if p.parent_inode in self.inodes:
868             parent = self.inodes[p.parent_inode]
869         else:
870             raise llfuse.FUSEError(errno.EIO)
871
872         # update atime
873         p._atime = time.time()
874
875         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
876         return fh
877
878     def readdir(self, fh, off):
879         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
880
881         if fh in self._filehandles:
882             handle = self._filehandles[fh]
883         else:
884             raise llfuse.FUSEError(errno.EBADF)
885
886         _logger.debug("arv-mount handle.entry %s", handle.entry)
887
888         e = off
889         while e < len(handle.entry):
890             if handle.entry[e][1].inode in self.inodes:
891                 try:
892                     yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
893                 except UnicodeEncodeError:
894                     pass
895             e += 1
896
897     def releasedir(self, fh):
898         del self._filehandles[fh]
899
900     def statfs(self):
901         st = llfuse.StatvfsData()
902         st.f_bsize = 64 * 1024
903         st.f_blocks = 0
904         st.f_files = 0
905
906         st.f_bfree = 0
907         st.f_bavail = 0
908
909         st.f_ffree = 0
910         st.f_favail = 0
911
912         st.f_frsize = 0
913         return st
914
915     # The llfuse documentation recommends only overloading functions that
916     # are actually implemented, as the default implementation will raise ENOSYS.
917     # However, there is a bug in the llfuse default implementation of create()
918     # "create() takes exactly 5 positional arguments (6 given)" which will crash
919     # arv-mount.
920     # The workaround is to implement it with the proper number of parameters,
921     # and then everything works out.
922     def create(self, p1, p2, p3, p4, p5):
923         raise llfuse.FUSEError(errno.EROFS)