4904: Refactor arv-web main() into ArvWeb class. Add CollectionDirectory.change_coll...
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
25
26 _logger = logging.getLogger('arvados.arvados_fuse')
27
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
32
33 class SafeApi(object):
34     '''Threadsafe wrapper for API object.  This stores and returns a different api
35     object per thread, because httplib2 which underlies apiclient is not
36     threadsafe.
37     '''
38
39     def __init__(self, config):
40         self.host = config.get('ARVADOS_API_HOST')
41         self.api_token = config.get('ARVADOS_API_TOKEN')
42         self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
43         self.local = threading.local()
44         self.block_cache = arvados.KeepBlockCache()
45
46     def localapi(self):
47         if 'api' not in self.local.__dict__:
48             self.local.api = arvados.api('v1', False, self.host,
49                                          self.api_token, self.insecure)
50         return self.local.api
51
52     def localkeep(self):
53         if 'keep' not in self.local.__dict__:
54             self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
55         return self.local.keep
56
57     def __getattr__(self, name):
58         # Proxy nonexistent attributes to the local API client.
59         try:
60             return getattr(self.localapi(), name)
61         except AttributeError:
62             return super(SafeApi, self).__getattr__(name)
63
64
65 def convertTime(t):
66     '''Parse Arvados timestamp to unix time.'''
67     try:
68         return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
69     except (TypeError, ValueError):
70         return 0
71
72 def sanitize_filename(dirty):
73     '''Replace disallowed filename characters with harmless "_".'''
74     if dirty is None:
75         return None
76     elif dirty == '':
77         return '_'
78     elif dirty == '.':
79         return '_'
80     elif dirty == '..':
81         return '__'
82     else:
83         return _disallowed_filename_characters.sub('_', dirty)
84
85
86 class FreshBase(object):
87     '''Base class for maintaining fresh/stale state to determine when to update.'''
88     def __init__(self):
89         self._stale = True
90         self._poll = False
91         self._last_update = time.time()
92         self._atime = time.time()
93         self._poll_time = 60
94
95     # Mark the value as stale
96     def invalidate(self):
97         self._stale = True
98
99     # Test if the entries dict is stale.
100     def stale(self):
101         if self._stale:
102             return True
103         if self._poll:
104             return (self._last_update + self._poll_time) < self._atime
105         return False
106
107     def fresh(self):
108         self._stale = False
109         self._last_update = time.time()
110
111     def atime(self):
112         return self._atime
113
114 class File(FreshBase):
115     '''Base for file objects.'''
116
117     def __init__(self, parent_inode, _mtime=0):
118         super(File, self).__init__()
119         self.inode = None
120         self.parent_inode = parent_inode
121         self._mtime = _mtime
122
123     def size(self):
124         return 0
125
126     def readfrom(self, off, size):
127         return ''
128
129     def mtime(self):
130         return self._mtime
131
132
133 class StreamReaderFile(File):
134     '''Wraps a StreamFileReader as a file.'''
135
136     def __init__(self, parent_inode, reader, _mtime):
137         super(StreamReaderFile, self).__init__(parent_inode, _mtime)
138         self.reader = reader
139
140     def size(self):
141         return self.reader.size()
142
143     def readfrom(self, off, size):
144         return self.reader.readfrom(off, size)
145
146     def stale(self):
147         return False
148
149
150 class StringFile(File):
151     '''Wrap a simple string as a file'''
152     def __init__(self, parent_inode, contents, _mtime):
153         super(StringFile, self).__init__(parent_inode, _mtime)
154         self.contents = contents
155
156     def size(self):
157         return len(self.contents)
158
159     def readfrom(self, off, size):
160         return self.contents[off:(off+size)]
161
162
163 class ObjectFile(StringFile):
164     '''Wrap a dict as a serialized json object.'''
165
166     def __init__(self, parent_inode, obj):
167         super(ObjectFile, self).__init__(parent_inode, "", 0)
168         self.uuid = obj['uuid']
169         self.update(obj)
170
171     def update(self, obj):
172         self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
173         self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
174
175
176 class Directory(FreshBase):
177     '''Generic directory object, backed by a dict.
178     Consists of a set of entries with the key representing the filename
179     and the value referencing a File or Directory object.
180     '''
181
182     def __init__(self, parent_inode):
183         super(Directory, self).__init__()
184
185         '''parent_inode is the integer inode number'''
186         self.inode = None
187         if not isinstance(parent_inode, int):
188             raise Exception("parent_inode should be an int")
189         self.parent_inode = parent_inode
190         self._entries = {}
191         self._mtime = time.time()
192
193     #  Overriden by subclasses to implement logic to update the entries dict
194     #  when the directory is stale
195     def update(self):
196         pass
197
198     # Only used when computing the size of the disk footprint of the directory
199     # (stub)
200     def size(self):
201         return 0
202
203     def checkupdate(self):
204         if self.stale():
205             try:
206                 self.update()
207             except apiclient.errors.HttpError as e:
208                 _logger.debug(e)
209
210     def __getitem__(self, item):
211         self.checkupdate()
212         return self._entries[item]
213
214     def items(self):
215         self.checkupdate()
216         return self._entries.items()
217
218     def __iter__(self):
219         self.checkupdate()
220         return self._entries.iterkeys()
221
222     def __contains__(self, k):
223         self.checkupdate()
224         return k in self._entries
225
226     def merge(self, items, fn, same, new_entry):
227         '''Helper method for updating the contents of the directory.  Takes a list
228         describing the new contents of the directory, reuse entries that are
229         the same in both the old and new lists, create new entries, and delete
230         old entries missing from the new list.
231
232         items: iterable with new directory contents
233
234         fn: function to take an entry in 'items' and return the desired file or
235         directory name, or None if this entry should be skipped
236
237         same: function to compare an existing entry (a File or Directory
238         object) with an entry in the items list to determine whether to keep
239         the existing entry.
240
241         new_entry: function to create a new directory entry (File or Directory
242         object) from an entry in the items list.
243
244         '''
245
246         oldentries = self._entries
247         self._entries = {}
248         changed = False
249         for i in items:
250             name = sanitize_filename(fn(i))
251             if name:
252                 if name in oldentries and same(oldentries[name], i):
253                     # move existing directory entry over
254                     self._entries[name] = oldentries[name]
255                     del oldentries[name]
256                 else:
257                     # create new directory entry
258                     ent = new_entry(i)
259                     if ent is not None:
260                         self._entries[name] = self.inodes.add_entry(ent)
261                         changed = True
262
263         # delete any other directory entries that were not in found in 'items'
264         for i in oldentries:
265             llfuse.invalidate_entry(self.inode, str(i))
266             self.inodes.del_entry(oldentries[i])
267             changed = True
268
269         if changed:
270             self._mtime = time.time()
271
272         self.fresh()
273
274     def clear(self):
275         '''Delete all entries'''
276         oldentries = self._entries
277         self._entries = {}
278         for n in oldentries:
279             if isinstance(n, Directory):
280                 n.clear()
281             llfuse.invalidate_entry(self.inode, str(n))
282             self.inodes.del_entry(oldentries[n])
283         llfuse.invalidate_inode(self.inode)
284         self.invalidate()
285
286     def mtime(self):
287         return self._mtime
288
289
290 class CollectionDirectory(Directory):
291     '''Represents the root of a directory tree holding a collection.'''
292
293     def __init__(self, parent_inode, inodes, api, num_retries, collection):
294         super(CollectionDirectory, self).__init__(parent_inode)
295         self.inodes = inodes
296         self.api = api
297         self.num_retries = num_retries
298         self.collection_object_file = None
299         self.collection_object = None
300         if isinstance(collection, dict):
301             self.collection_locator = collection['uuid']
302         else:
303             self.collection_locator = collection
304
305     def same(self, i):
306         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
307
308     def change_collection(self, new_locator):
309         self.collection_locator = new_locator
310         self.collection_object = None
311         self.update()
312
313     def new_collection(self, new_collection_object, coll_reader):
314         self.collection_object = new_collection_object
315
316         if self.collection_object_file is not None:
317             self.collection_object_file.update(self.collection_object)
318
319         self.clear()
320         for s in coll_reader.all_streams():
321             cwd = self
322             for part in s.name().split('/'):
323                 if part != '' and part != '.':
324                     partname = sanitize_filename(part)
325                     if partname not in cwd._entries:
326                         cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
327                     cwd = cwd._entries[partname]
328             for k, v in s.files().items():
329                 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
330
331     def update(self):
332         try:
333             if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
334                 return True
335
336             with llfuse.lock_released:
337                 coll_reader = arvados.CollectionReader(
338                     self.collection_locator, self.api, self.api.localkeep(),
339                     num_retries=self.num_retries)
340                 new_collection_object = coll_reader.api_response() or {}
341                 # If the Collection only exists in Keep, there will be no API
342                 # response.  Fill in the fields we need.
343                 if 'uuid' not in new_collection_object:
344                     new_collection_object['uuid'] = self.collection_locator
345                 if "portable_data_hash" not in new_collection_object:
346                     new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
347                 if 'manifest_text' not in new_collection_object:
348                     new_collection_object['manifest_text'] = coll_reader.manifest_text()
349                 coll_reader.normalize()
350             # end with llfuse.lock_released, re-acquire lock
351
352             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
353                 self.new_collection(new_collection_object, coll_reader)
354
355             self.fresh()
356             return True
357         except apiclient.errors.NotFoundError:
358             _logger.exception("arv-mount %s: error", self.collection_locator)
359         except arvados.errors.ArgumentError as detail:
360             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
361             if self.collection_object is not None and "manifest_text" in self.collection_object:
362                 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
363         except Exception:
364             _logger.exception("arv-mount %s: error", self.collection_locator)
365             if self.collection_object is not None and "manifest_text" in self.collection_object:
366                 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
367         return False
368
369     def __getitem__(self, item):
370         self.checkupdate()
371         if item == '.arvados#collection':
372             if self.collection_object_file is None:
373                 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
374                 self.inodes.add_entry(self.collection_object_file)
375             return self.collection_object_file
376         else:
377             return super(CollectionDirectory, self).__getitem__(item)
378
379     def __contains__(self, k):
380         if k == '.arvados#collection':
381             return True
382         else:
383             return super(CollectionDirectory, self).__contains__(k)
384
385     def mtime(self):
386         self.checkupdate()
387         return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
388
389
390 class MagicDirectory(Directory):
391     '''A special directory that logically contains the set of all extant keep
392     locators.  When a file is referenced by lookup(), it is tested to see if it
393     is a valid keep locator to a manifest, and if so, loads the manifest
394     contents as a subdirectory of this directory with the locator as the
395     directory name.  Since querying a list of all extant keep locators is
396     impractical, only collections that have already been accessed are visible
397     to readdir().
398     '''
399
400     README_TEXT = '''
401 This directory provides access to Arvados collections as subdirectories listed
402 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
403 the form '1234567890abcdefghijklmnopqrstuv+123').
404
405 Note that this directory will appear empty until you attempt to access a
406 specific collection subdirectory (such as trying to 'cd' into it), at which
407 point the collection will actually be looked up on the server and the directory
408 will appear if it exists.
409 '''.lstrip()
410
411     def __init__(self, parent_inode, inodes, api, num_retries):
412         super(MagicDirectory, self).__init__(parent_inode)
413         self.inodes = inodes
414         self.api = api
415         self.num_retries = num_retries
416
417     def __setattr__(self, name, value):
418         super(MagicDirectory, self).__setattr__(name, value)
419         # When we're assigned an inode, add a README.
420         if ((name == 'inode') and (self.inode is not None) and
421               (not self._entries)):
422             self._entries['README'] = self.inodes.add_entry(
423                 StringFile(self.inode, self.README_TEXT, time.time()))
424             # If we're the root directory, add an identical by_id subdirectory.
425             if self.inode == llfuse.ROOT_INODE:
426                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
427                         self.inode, self.inodes, self.api, self.num_retries))
428
429     def __contains__(self, k):
430         if k in self._entries:
431             return True
432
433         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
434             return False
435
436         try:
437             e = self.inodes.add_entry(CollectionDirectory(
438                     self.inode, self.inodes, self.api, self.num_retries, k))
439             if e.update():
440                 self._entries[k] = e
441                 return True
442             else:
443                 return False
444         except Exception as e:
445             _logger.debug('arv-mount exception keep %s', e)
446             return False
447
448     def __getitem__(self, item):
449         if item in self:
450             return self._entries[item]
451         else:
452             raise KeyError("No collection with id " + item)
453
454
455 class RecursiveInvalidateDirectory(Directory):
456     def invalidate(self):
457         if self.inode == llfuse.ROOT_INODE:
458             llfuse.lock.acquire()
459         try:
460             super(RecursiveInvalidateDirectory, self).invalidate()
461             for a in self._entries:
462                 self._entries[a].invalidate()
463         except Exception:
464             _logger.exception()
465         finally:
466             if self.inode == llfuse.ROOT_INODE:
467                 llfuse.lock.release()
468
469
470 class TagsDirectory(RecursiveInvalidateDirectory):
471     '''A special directory that contains as subdirectories all tags visible to the user.'''
472
473     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
474         super(TagsDirectory, self).__init__(parent_inode)
475         self.inodes = inodes
476         self.api = api
477         self.num_retries = num_retries
478         self._poll = True
479         self._poll_time = poll_time
480
481     def update(self):
482         with llfuse.lock_released:
483             tags = self.api.links().list(
484                 filters=[['link_class', '=', 'tag']],
485                 select=['name'], distinct=True
486                 ).execute(num_retries=self.num_retries)
487         if "items" in tags:
488             self.merge(tags['items'],
489                        lambda i: i['name'] if 'name' in i else i['uuid'],
490                        lambda a, i: a.tag == i,
491                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
492
493
494 class TagDirectory(Directory):
495     '''A special directory that contains as subdirectories all collections visible
496     to the user that are tagged with a particular tag.
497     '''
498
499     def __init__(self, parent_inode, inodes, api, num_retries, tag,
500                  poll=False, poll_time=60):
501         super(TagDirectory, self).__init__(parent_inode)
502         self.inodes = inodes
503         self.api = api
504         self.num_retries = num_retries
505         self.tag = tag
506         self._poll = poll
507         self._poll_time = poll_time
508
509     def update(self):
510         with llfuse.lock_released:
511             taggedcollections = self.api.links().list(
512                 filters=[['link_class', '=', 'tag'],
513                          ['name', '=', self.tag],
514                          ['head_uuid', 'is_a', 'arvados#collection']],
515                 select=['head_uuid']
516                 ).execute(num_retries=self.num_retries)
517         self.merge(taggedcollections['items'],
518                    lambda i: i['head_uuid'],
519                    lambda a, i: a.collection_locator == i['head_uuid'],
520                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
521
522
523 class ProjectDirectory(Directory):
524     '''A special directory that contains the contents of a project.'''
525
526     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
527                  poll=False, poll_time=60):
528         super(ProjectDirectory, self).__init__(parent_inode)
529         self.inodes = inodes
530         self.api = api
531         self.num_retries = num_retries
532         self.project_object = project_object
533         self.project_object_file = None
534         self.uuid = project_object['uuid']
535         self._poll = poll
536         self._poll_time = poll_time
537
538     def createDirectory(self, i):
539         if collection_uuid_pattern.match(i['uuid']):
540             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
541         elif group_uuid_pattern.match(i['uuid']):
542             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
543         elif link_uuid_pattern.match(i['uuid']):
544             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
545                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
546             else:
547                 return None
548         elif uuid_pattern.match(i['uuid']):
549             return ObjectFile(self.parent_inode, i)
550         else:
551             return None
552
553     def update(self):
554         if self.project_object_file == None:
555             self.project_object_file = ObjectFile(self.inode, self.project_object)
556             self.inodes.add_entry(self.project_object_file)
557
558         def namefn(i):
559             if 'name' in i:
560                 if i['name'] is None or len(i['name']) == 0:
561                     return None
562                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
563                     # collection or subproject
564                     return i['name']
565                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
566                     # name link
567                     return i['name']
568                 elif 'kind' in i and i['kind'].startswith('arvados#'):
569                     # something else
570                     return "{}.{}".format(i['name'], i['kind'][8:])
571             else:
572                 return None
573
574         def samefn(a, i):
575             if isinstance(a, CollectionDirectory):
576                 return a.collection_locator == i['uuid']
577             elif isinstance(a, ProjectDirectory):
578                 return a.uuid == i['uuid']
579             elif isinstance(a, ObjectFile):
580                 return a.uuid == i['uuid'] and not a.stale()
581             return False
582
583         with llfuse.lock_released:
584             if group_uuid_pattern.match(self.uuid):
585                 self.project_object = self.api.groups().get(
586                     uuid=self.uuid).execute(num_retries=self.num_retries)
587             elif user_uuid_pattern.match(self.uuid):
588                 self.project_object = self.api.users().get(
589                     uuid=self.uuid).execute(num_retries=self.num_retries)
590
591             contents = arvados.util.list_all(self.api.groups().contents,
592                                              self.num_retries, uuid=self.uuid)
593             # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
594             contents += arvados.util.list_all(
595                 self.api.links().list, self.num_retries,
596                 filters=[['tail_uuid', '=', self.uuid],
597                          ['link_class', '=', 'name']])
598
599         # end with llfuse.lock_released, re-acquire lock
600
601         self.merge(contents,
602                    namefn,
603                    samefn,
604                    self.createDirectory)
605
606     def __getitem__(self, item):
607         self.checkupdate()
608         if item == '.arvados#project':
609             return self.project_object_file
610         else:
611             return super(ProjectDirectory, self).__getitem__(item)
612
613     def __contains__(self, k):
614         if k == '.arvados#project':
615             return True
616         else:
617             return super(ProjectDirectory, self).__contains__(k)
618
619
620 class SharedDirectory(Directory):
621     '''A special directory that represents users or groups who have shared projects with me.'''
622
623     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
624                  poll=False, poll_time=60):
625         super(SharedDirectory, self).__init__(parent_inode)
626         self.inodes = inodes
627         self.api = api
628         self.num_retries = num_retries
629         self.current_user = api.users().current().execute(num_retries=num_retries)
630         self._poll = True
631         self._poll_time = poll_time
632
633     def update(self):
634         with llfuse.lock_released:
635             all_projects = arvados.util.list_all(
636                 self.api.groups().list, self.num_retries,
637                 filters=[['group_class','=','project']])
638             objects = {}
639             for ob in all_projects:
640                 objects[ob['uuid']] = ob
641
642             roots = []
643             root_owners = {}
644             for ob in all_projects:
645                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
646                     roots.append(ob)
647                     root_owners[ob['owner_uuid']] = True
648
649             lusers = arvados.util.list_all(
650                 self.api.users().list, self.num_retries,
651                 filters=[['uuid','in', list(root_owners)]])
652             lgroups = arvados.util.list_all(
653                 self.api.groups().list, self.num_retries,
654                 filters=[['uuid','in', list(root_owners)]])
655
656             users = {}
657             groups = {}
658
659             for l in lusers:
660                 objects[l["uuid"]] = l
661             for l in lgroups:
662                 objects[l["uuid"]] = l
663
664             contents = {}
665             for r in root_owners:
666                 if r in objects:
667                     obr = objects[r]
668                     if "name" in obr:
669                         contents[obr["name"]] = obr
670                     if "first_name" in obr:
671                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
672
673             for r in roots:
674                 if r['owner_uuid'] not in objects:
675                     contents[r['name']] = r
676
677         # end with llfuse.lock_released, re-acquire lock
678
679         try:
680             self.merge(contents.items(),
681                        lambda i: i[0],
682                        lambda a, i: a.uuid == i[1]['uuid'],
683                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
684         except Exception:
685             _logger.exception()
686
687
688 class FileHandle(object):
689     '''Connects a numeric file handle to a File or Directory object that has
690     been opened by the client.'''
691
692     def __init__(self, fh, entry):
693         self.fh = fh
694         self.entry = entry
695
696
697 class Inodes(object):
698     '''Manage the set of inodes.  This is the mapping from a numeric id
699     to a concrete File or Directory object'''
700
701     def __init__(self):
702         self._entries = {}
703         self._counter = itertools.count(llfuse.ROOT_INODE)
704
705     def __getitem__(self, item):
706         return self._entries[item]
707
708     def __setitem__(self, key, item):
709         self._entries[key] = item
710
711     def __iter__(self):
712         return self._entries.iterkeys()
713
714     def items(self):
715         return self._entries.items()
716
717     def __contains__(self, k):
718         return k in self._entries
719
720     def add_entry(self, entry):
721         entry.inode = next(self._counter)
722         self._entries[entry.inode] = entry
723         return entry
724
725     def del_entry(self, entry):
726         llfuse.invalidate_inode(entry.inode)
727         del self._entries[entry.inode]
728
729 class Operations(llfuse.Operations):
730     '''This is the main interface with llfuse.  The methods on this object are
731     called by llfuse threads to service FUSE events to query and read from
732     the file system.
733
734     llfuse has its own global lock which is acquired before calling a request handler,
735     so request handlers do not run concurrently unless the lock is explicitly released
736     using "with llfuse.lock_released:"'''
737
738     def __init__(self, uid, gid, encoding="utf-8"):
739         super(Operations, self).__init__()
740
741         self.inodes = Inodes()
742         self.uid = uid
743         self.gid = gid
744         self.encoding = encoding
745
746         # dict of inode to filehandle
747         self._filehandles = {}
748         self._filehandles_counter = 1
749
750         # Other threads that need to wait until the fuse driver
751         # is fully initialized should wait() on this event object.
752         self.initlock = threading.Event()
753
754     def init(self):
755         # Allow threads that are waiting for the driver to be finished
756         # initializing to continue
757         self.initlock.set()
758
759     def access(self, inode, mode, ctx):
760         return True
761
762     def getattr(self, inode):
763         if inode not in self.inodes:
764             raise llfuse.FUSEError(errno.ENOENT)
765
766         e = self.inodes[inode]
767
768         entry = llfuse.EntryAttributes()
769         entry.st_ino = inode
770         entry.generation = 0
771         entry.entry_timeout = 300
772         entry.attr_timeout = 300
773
774         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
775         if isinstance(e, Directory):
776             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
777         elif isinstance(e, StreamReaderFile):
778             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
779         else:
780             entry.st_mode |= stat.S_IFREG
781
782         entry.st_nlink = 1
783         entry.st_uid = self.uid
784         entry.st_gid = self.gid
785         entry.st_rdev = 0
786
787         entry.st_size = e.size()
788
789         entry.st_blksize = 512
790         entry.st_blocks = (e.size()/512)+1
791         entry.st_atime = int(e.atime())
792         entry.st_mtime = int(e.mtime())
793         entry.st_ctime = int(e.mtime())
794
795         return entry
796
797     def lookup(self, parent_inode, name):
798         name = unicode(name, self.encoding)
799         _logger.debug("arv-mount lookup: parent_inode %i name %s",
800                       parent_inode, name)
801         inode = None
802
803         if name == '.':
804             inode = parent_inode
805         else:
806             if parent_inode in self.inodes:
807                 p = self.inodes[parent_inode]
808                 if name == '..':
809                     inode = p.parent_inode
810                 elif isinstance(p, Directory) and name in p:
811                     inode = p[name].inode
812
813         if inode != None:
814             return self.getattr(inode)
815         else:
816             raise llfuse.FUSEError(errno.ENOENT)
817
818     def open(self, inode, flags):
819         if inode in self.inodes:
820             p = self.inodes[inode]
821         else:
822             raise llfuse.FUSEError(errno.ENOENT)
823
824         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
825             raise llfuse.FUSEError(errno.EROFS)
826
827         if isinstance(p, Directory):
828             raise llfuse.FUSEError(errno.EISDIR)
829
830         fh = self._filehandles_counter
831         self._filehandles_counter += 1
832         self._filehandles[fh] = FileHandle(fh, p)
833         return fh
834
835     def read(self, fh, off, size):
836         _logger.debug("arv-mount read %i %i %i", fh, off, size)
837         if fh in self._filehandles:
838             handle = self._filehandles[fh]
839         else:
840             raise llfuse.FUSEError(errno.EBADF)
841
842         # update atime
843         handle.entry._atime = time.time()
844
845         try:
846             with llfuse.lock_released:
847                 return handle.entry.readfrom(off, size)
848         except arvados.errors.NotFoundError as e:
849             _logger.warning("Block not found: " + str(e))
850             raise llfuse.FUSEError(errno.EIO)
851         except Exception:
852             _logger.exception()
853             raise llfuse.FUSEError(errno.EIO)
854
855     def release(self, fh):
856         if fh in self._filehandles:
857             del self._filehandles[fh]
858
859     def opendir(self, inode):
860         _logger.debug("arv-mount opendir: inode %i", inode)
861
862         if inode in self.inodes:
863             p = self.inodes[inode]
864         else:
865             raise llfuse.FUSEError(errno.ENOENT)
866
867         if not isinstance(p, Directory):
868             raise llfuse.FUSEError(errno.ENOTDIR)
869
870         fh = self._filehandles_counter
871         self._filehandles_counter += 1
872         if p.parent_inode in self.inodes:
873             parent = self.inodes[p.parent_inode]
874         else:
875             raise llfuse.FUSEError(errno.EIO)
876
877         # update atime
878         p._atime = time.time()
879
880         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
881         return fh
882
883     def readdir(self, fh, off):
884         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
885
886         if fh in self._filehandles:
887             handle = self._filehandles[fh]
888         else:
889             raise llfuse.FUSEError(errno.EBADF)
890
891         _logger.debug("arv-mount handle.entry %s", handle.entry)
892
893         e = off
894         while e < len(handle.entry):
895             if handle.entry[e][1].inode in self.inodes:
896                 try:
897                     yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
898                 except UnicodeEncodeError:
899                     pass
900             e += 1
901
902     def releasedir(self, fh):
903         del self._filehandles[fh]
904
905     def statfs(self):
906         st = llfuse.StatvfsData()
907         st.f_bsize = 64 * 1024
908         st.f_blocks = 0
909         st.f_files = 0
910
911         st.f_bfree = 0
912         st.f_bavail = 0
913
914         st.f_ffree = 0
915         st.f_favail = 0
916
917         st.f_frsize = 0
918         return st
919
920     # The llfuse documentation recommends only overloading functions that
921     # are actually implemented, as the default implementation will raise ENOSYS.
922     # However, there is a bug in the llfuse default implementation of create()
923     # "create() takes exactly 5 positional arguments (6 given)" which will crash
924     # arv-mount.
925     # The workaround is to implement it with the proper number of parameters,
926     # and then everything works out.
927     def create(self, p1, p2, p3, p4, p5):
928         raise llfuse.FUSEError(errno.EROFS)