Merge branch 'master' into 4904-arv-web
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
25
26 _logger = logging.getLogger('arvados.arvados_fuse')
27
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
32
33 class SafeApi(object):
34     '''Threadsafe wrapper for API object.  This stores and returns a different api
35     object per thread, because httplib2 which underlies apiclient is not
36     threadsafe.
37     '''
38
39     def __init__(self, config):
40         self.host = config.get('ARVADOS_API_HOST')
41         self.api_token = config.get('ARVADOS_API_TOKEN')
42         self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
43         self.local = threading.local()
44         self.block_cache = arvados.KeepBlockCache()
45
46     def localapi(self):
47         if 'api' not in self.local.__dict__:
48             self.local.api = arvados.api('v1', False, self.host,
49                                          self.api_token, self.insecure)
50         return self.local.api
51
52     def localkeep(self):
53         if 'keep' not in self.local.__dict__:
54             self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
55         return self.local.keep
56
57     def __getattr__(self, name):
58         # Proxy nonexistent attributes to the local API client.
59         try:
60             return getattr(self.localapi(), name)
61         except AttributeError:
62             return super(SafeApi, self).__getattr__(name)
63
64
65 def convertTime(t):
66     '''Parse Arvados timestamp to unix time.'''
67     try:
68         return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
69     except (TypeError, ValueError):
70         return 0
71
72 def sanitize_filename(dirty):
73     '''Replace disallowed filename characters with harmless "_".'''
74     if dirty is None:
75         return None
76     elif dirty == '':
77         return '_'
78     elif dirty == '.':
79         return '_'
80     elif dirty == '..':
81         return '__'
82     else:
83         return _disallowed_filename_characters.sub('_', dirty)
84
85
86 class FreshBase(object):
87     '''Base class for maintaining fresh/stale state to determine when to update.'''
88     def __init__(self):
89         self._stale = True
90         self._poll = False
91         self._last_update = time.time()
92         self._atime = time.time()
93         self._poll_time = 60
94
95     # Mark the value as stale
96     def invalidate(self):
97         self._stale = True
98
99     # Test if the entries dict is stale.
100     def stale(self):
101         if self._stale:
102             return True
103         if self._poll:
104             return (self._last_update + self._poll_time) < self._atime
105         return False
106
107     def fresh(self):
108         self._stale = False
109         self._last_update = time.time()
110
111     def atime(self):
112         return self._atime
113
114 class File(FreshBase):
115     '''Base for file objects.'''
116
117     def __init__(self, parent_inode, _mtime=0):
118         super(File, self).__init__()
119         self.inode = None
120         self.parent_inode = parent_inode
121         self._mtime = _mtime
122
123     def size(self):
124         return 0
125
126     def readfrom(self, off, size):
127         return ''
128
129     def mtime(self):
130         return self._mtime
131
132
133 class StreamReaderFile(File):
134     '''Wraps a StreamFileReader as a file.'''
135
136     def __init__(self, parent_inode, reader, _mtime):
137         super(StreamReaderFile, self).__init__(parent_inode, _mtime)
138         self.reader = reader
139
140     def size(self):
141         return self.reader.size()
142
143     def readfrom(self, off, size):
144         return self.reader.readfrom(off, size)
145
146     def stale(self):
147         return False
148
149
150 class StringFile(File):
151     '''Wrap a simple string as a file'''
152     def __init__(self, parent_inode, contents, _mtime):
153         super(StringFile, self).__init__(parent_inode, _mtime)
154         self.contents = contents
155
156     def size(self):
157         return len(self.contents)
158
159     def readfrom(self, off, size):
160         return self.contents[off:(off+size)]
161
162
163 class ObjectFile(StringFile):
164     '''Wrap a dict as a serialized json object.'''
165
166     def __init__(self, parent_inode, obj):
167         super(ObjectFile, self).__init__(parent_inode, "", 0)
168         self.uuid = obj['uuid']
169         self.update(obj)
170
171     def update(self, obj):
172         self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
173         self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
174
175
176 class Directory(FreshBase):
177     '''Generic directory object, backed by a dict.
178     Consists of a set of entries with the key representing the filename
179     and the value referencing a File or Directory object.
180     '''
181
182     def __init__(self, parent_inode):
183         super(Directory, self).__init__()
184
185         '''parent_inode is the integer inode number'''
186         self.inode = None
187         if not isinstance(parent_inode, int):
188             raise Exception("parent_inode should be an int")
189         self.parent_inode = parent_inode
190         self._entries = {}
191         self._mtime = time.time()
192
193     #  Overriden by subclasses to implement logic to update the entries dict
194     #  when the directory is stale
195     def update(self):
196         pass
197
198     # Only used when computing the size of the disk footprint of the directory
199     # (stub)
200     def size(self):
201         return 0
202
203     def checkupdate(self):
204         if self.stale():
205             try:
206                 self.update()
207             except apiclient.errors.HttpError as e:
208                 _logger.debug(e)
209
210     def __getitem__(self, item):
211         self.checkupdate()
212         return self._entries[item]
213
214     def items(self):
215         self.checkupdate()
216         return self._entries.items()
217
218     def __iter__(self):
219         self.checkupdate()
220         return self._entries.iterkeys()
221
222     def __contains__(self, k):
223         self.checkupdate()
224         return k in self._entries
225
226     def merge(self, items, fn, same, new_entry):
227         '''Helper method for updating the contents of the directory.  Takes a list
228         describing the new contents of the directory, reuse entries that are
229         the same in both the old and new lists, create new entries, and delete
230         old entries missing from the new list.
231
232         items: iterable with new directory contents
233
234         fn: function to take an entry in 'items' and return the desired file or
235         directory name, or None if this entry should be skipped
236
237         same: function to compare an existing entry (a File or Directory
238         object) with an entry in the items list to determine whether to keep
239         the existing entry.
240
241         new_entry: function to create a new directory entry (File or Directory
242         object) from an entry in the items list.
243
244         '''
245
246         oldentries = self._entries
247         self._entries = {}
248         changed = False
249         for i in items:
250             name = sanitize_filename(fn(i))
251             if name:
252                 if name in oldentries and same(oldentries[name], i):
253                     # move existing directory entry over
254                     self._entries[name] = oldentries[name]
255                     del oldentries[name]
256                 else:
257                     # create new directory entry
258                     ent = new_entry(i)
259                     if ent is not None:
260                         self._entries[name] = self.inodes.add_entry(ent)
261                         changed = True
262
263         # delete any other directory entries that were not in found in 'items'
264         for i in oldentries:
265             llfuse.invalidate_entry(self.inode, str(i))
266             self.inodes.del_entry(oldentries[i])
267             changed = True
268
269         if changed:
270             self._mtime = time.time()
271
272         self.fresh()
273
274     def clear(self):
275         '''Delete all entries'''
276         oldentries = self._entries
277         self._entries = {}
278         for n in oldentries:
279             if isinstance(n, Directory):
280                 n.clear()
281             llfuse.invalidate_entry(self.inode, str(n))
282             self.inodes.del_entry(oldentries[n])
283         llfuse.invalidate_inode(self.inode)
284         self.invalidate()
285
286     def mtime(self):
287         return self._mtime
288
289
290 class CollectionDirectory(Directory):
291     '''Represents the root of a directory tree holding a collection.'''
292
293     def __init__(self, parent_inode, inodes, api, num_retries, collection):
294         super(CollectionDirectory, self).__init__(parent_inode)
295         self.inodes = inodes
296         self.api = api
297         self.num_retries = num_retries
298         self.collection_object_file = None
299         self.collection_object = None
300         if isinstance(collection, dict):
301             self.collection_locator = collection['uuid']
302         else:
303             self.collection_locator = collection
304
305     def same(self, i):
306         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
307
308     # Used by arv-web.py to switch the contents of the CollectionDirectory
309     def change_collection(self, new_locator):
310         """Switch the contents of the CollectionDirectory.  Must be called with llfuse.lock held."""
311         self.collection_locator = new_locator
312         self.collection_object = None
313         self.update()
314
315     def new_collection(self, new_collection_object, coll_reader):
316         self.collection_object = new_collection_object
317
318         if self.collection_object_file is not None:
319             self.collection_object_file.update(self.collection_object)
320
321         self.clear()
322         for s in coll_reader.all_streams():
323             cwd = self
324             for part in s.name().split('/'):
325                 if part != '' and part != '.':
326                     partname = sanitize_filename(part)
327                     if partname not in cwd._entries:
328                         cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
329                     cwd = cwd._entries[partname]
330             for k, v in s.files().items():
331                 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
332
333     def update(self):
334         try:
335             if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
336                 return True
337
338             if self.collection_locator is None:
339                 self.fresh()
340                 return True
341
342             with llfuse.lock_released:
343                 coll_reader = arvados.CollectionReader(
344                     self.collection_locator, self.api, self.api.localkeep(),
345                     num_retries=self.num_retries)
346                 new_collection_object = coll_reader.api_response() or {}
347                 # If the Collection only exists in Keep, there will be no API
348                 # response.  Fill in the fields we need.
349                 if 'uuid' not in new_collection_object:
350                     new_collection_object['uuid'] = self.collection_locator
351                 if "portable_data_hash" not in new_collection_object:
352                     new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
353                 if 'manifest_text' not in new_collection_object:
354                     new_collection_object['manifest_text'] = coll_reader.manifest_text()
355                 coll_reader.normalize()
356             # end with llfuse.lock_released, re-acquire lock
357
358             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
359                 self.new_collection(new_collection_object, coll_reader)
360
361             self.fresh()
362             return True
363         except arvados.errors.NotFoundError:
364             _logger.exception("arv-mount %s: error", self.collection_locator)
365         except arvados.errors.ArgumentError as detail:
366             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
367             if self.collection_object is not None and "manifest_text" in self.collection_object:
368                 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
369         except Exception:
370             _logger.exception("arv-mount %s: error", self.collection_locator)
371             if self.collection_object is not None and "manifest_text" in self.collection_object:
372                 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
373         return False
374
375     def __getitem__(self, item):
376         self.checkupdate()
377         if item == '.arvados#collection':
378             if self.collection_object_file is None:
379                 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
380                 self.inodes.add_entry(self.collection_object_file)
381             return self.collection_object_file
382         else:
383             return super(CollectionDirectory, self).__getitem__(item)
384
385     def __contains__(self, k):
386         if k == '.arvados#collection':
387             return True
388         else:
389             return super(CollectionDirectory, self).__contains__(k)
390
391     def mtime(self):
392         self.checkupdate()
393         return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
394
395
396 class MagicDirectory(Directory):
397     '''A special directory that logically contains the set of all extant keep
398     locators.  When a file is referenced by lookup(), it is tested to see if it
399     is a valid keep locator to a manifest, and if so, loads the manifest
400     contents as a subdirectory of this directory with the locator as the
401     directory name.  Since querying a list of all extant keep locators is
402     impractical, only collections that have already been accessed are visible
403     to readdir().
404     '''
405
406     README_TEXT = '''
407 This directory provides access to Arvados collections as subdirectories listed
408 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
409 the form '1234567890abcdefghijklmnopqrstuv+123').
410
411 Note that this directory will appear empty until you attempt to access a
412 specific collection subdirectory (such as trying to 'cd' into it), at which
413 point the collection will actually be looked up on the server and the directory
414 will appear if it exists.
415 '''.lstrip()
416
417     def __init__(self, parent_inode, inodes, api, num_retries):
418         super(MagicDirectory, self).__init__(parent_inode)
419         self.inodes = inodes
420         self.api = api
421         self.num_retries = num_retries
422
423     def __setattr__(self, name, value):
424         super(MagicDirectory, self).__setattr__(name, value)
425         # When we're assigned an inode, add a README.
426         if ((name == 'inode') and (self.inode is not None) and
427               (not self._entries)):
428             self._entries['README'] = self.inodes.add_entry(
429                 StringFile(self.inode, self.README_TEXT, time.time()))
430             # If we're the root directory, add an identical by_id subdirectory.
431             if self.inode == llfuse.ROOT_INODE:
432                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
433                         self.inode, self.inodes, self.api, self.num_retries))
434
435     def __contains__(self, k):
436         if k in self._entries:
437             return True
438
439         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
440             return False
441
442         try:
443             e = self.inodes.add_entry(CollectionDirectory(
444                     self.inode, self.inodes, self.api, self.num_retries, k))
445             if e.update():
446                 self._entries[k] = e
447                 return True
448             else:
449                 return False
450         except Exception as e:
451             _logger.debug('arv-mount exception keep %s', e)
452             return False
453
454     def __getitem__(self, item):
455         if item in self:
456             return self._entries[item]
457         else:
458             raise KeyError("No collection with id " + item)
459
460
461 class RecursiveInvalidateDirectory(Directory):
462     def invalidate(self):
463         if self.inode == llfuse.ROOT_INODE:
464             llfuse.lock.acquire()
465         try:
466             super(RecursiveInvalidateDirectory, self).invalidate()
467             for a in self._entries:
468                 self._entries[a].invalidate()
469         except Exception:
470             _logger.exception()
471         finally:
472             if self.inode == llfuse.ROOT_INODE:
473                 llfuse.lock.release()
474
475
476 class TagsDirectory(RecursiveInvalidateDirectory):
477     '''A special directory that contains as subdirectories all tags visible to the user.'''
478
479     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
480         super(TagsDirectory, self).__init__(parent_inode)
481         self.inodes = inodes
482         self.api = api
483         self.num_retries = num_retries
484         self._poll = True
485         self._poll_time = poll_time
486
487     def update(self):
488         with llfuse.lock_released:
489             tags = self.api.links().list(
490                 filters=[['link_class', '=', 'tag']],
491                 select=['name'], distinct=True
492                 ).execute(num_retries=self.num_retries)
493         if "items" in tags:
494             self.merge(tags['items'],
495                        lambda i: i['name'],
496                        lambda a, i: a.tag == i['name'],
497                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
498
499
500 class TagDirectory(Directory):
501     '''A special directory that contains as subdirectories all collections visible
502     to the user that are tagged with a particular tag.
503     '''
504
505     def __init__(self, parent_inode, inodes, api, num_retries, tag,
506                  poll=False, poll_time=60):
507         super(TagDirectory, self).__init__(parent_inode)
508         self.inodes = inodes
509         self.api = api
510         self.num_retries = num_retries
511         self.tag = tag
512         self._poll = poll
513         self._poll_time = poll_time
514
515     def update(self):
516         with llfuse.lock_released:
517             taggedcollections = self.api.links().list(
518                 filters=[['link_class', '=', 'tag'],
519                          ['name', '=', self.tag],
520                          ['head_uuid', 'is_a', 'arvados#collection']],
521                 select=['head_uuid']
522                 ).execute(num_retries=self.num_retries)
523         self.merge(taggedcollections['items'],
524                    lambda i: i['head_uuid'],
525                    lambda a, i: a.collection_locator == i['head_uuid'],
526                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
527
528
529 class ProjectDirectory(Directory):
530     '''A special directory that contains the contents of a project.'''
531
532     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
533                  poll=False, poll_time=60):
534         super(ProjectDirectory, self).__init__(parent_inode)
535         self.inodes = inodes
536         self.api = api
537         self.num_retries = num_retries
538         self.project_object = project_object
539         self.project_object_file = None
540         self.uuid = project_object['uuid']
541         self._poll = poll
542         self._poll_time = poll_time
543
544     def createDirectory(self, i):
545         if collection_uuid_pattern.match(i['uuid']):
546             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
547         elif group_uuid_pattern.match(i['uuid']):
548             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
549         elif link_uuid_pattern.match(i['uuid']):
550             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
551                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
552             else:
553                 return None
554         elif uuid_pattern.match(i['uuid']):
555             return ObjectFile(self.parent_inode, i)
556         else:
557             return None
558
559     def update(self):
560         if self.project_object_file == None:
561             self.project_object_file = ObjectFile(self.inode, self.project_object)
562             self.inodes.add_entry(self.project_object_file)
563
564         def namefn(i):
565             if 'name' in i:
566                 if i['name'] is None or len(i['name']) == 0:
567                     return None
568                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
569                     # collection or subproject
570                     return i['name']
571                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
572                     # name link
573                     return i['name']
574                 elif 'kind' in i and i['kind'].startswith('arvados#'):
575                     # something else
576                     return "{}.{}".format(i['name'], i['kind'][8:])
577             else:
578                 return None
579
580         def samefn(a, i):
581             if isinstance(a, CollectionDirectory):
582                 return a.collection_locator == i['uuid']
583             elif isinstance(a, ProjectDirectory):
584                 return a.uuid == i['uuid']
585             elif isinstance(a, ObjectFile):
586                 return a.uuid == i['uuid'] and not a.stale()
587             return False
588
589         with llfuse.lock_released:
590             if group_uuid_pattern.match(self.uuid):
591                 self.project_object = self.api.groups().get(
592                     uuid=self.uuid).execute(num_retries=self.num_retries)
593             elif user_uuid_pattern.match(self.uuid):
594                 self.project_object = self.api.users().get(
595                     uuid=self.uuid).execute(num_retries=self.num_retries)
596
597             contents = arvados.util.list_all(self.api.groups().contents,
598                                              self.num_retries, uuid=self.uuid)
599             # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
600             contents += arvados.util.list_all(
601                 self.api.links().list, self.num_retries,
602                 filters=[['tail_uuid', '=', self.uuid],
603                          ['link_class', '=', 'name']])
604
605         # end with llfuse.lock_released, re-acquire lock
606
607         self.merge(contents,
608                    namefn,
609                    samefn,
610                    self.createDirectory)
611
612     def __getitem__(self, item):
613         self.checkupdate()
614         if item == '.arvados#project':
615             return self.project_object_file
616         else:
617             return super(ProjectDirectory, self).__getitem__(item)
618
619     def __contains__(self, k):
620         if k == '.arvados#project':
621             return True
622         else:
623             return super(ProjectDirectory, self).__contains__(k)
624
625
626 class SharedDirectory(Directory):
627     '''A special directory that represents users or groups who have shared projects with me.'''
628
629     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
630                  poll=False, poll_time=60):
631         super(SharedDirectory, self).__init__(parent_inode)
632         self.inodes = inodes
633         self.api = api
634         self.num_retries = num_retries
635         self.current_user = api.users().current().execute(num_retries=num_retries)
636         self._poll = True
637         self._poll_time = poll_time
638
639     def update(self):
640         with llfuse.lock_released:
641             all_projects = arvados.util.list_all(
642                 self.api.groups().list, self.num_retries,
643                 filters=[['group_class','=','project']])
644             objects = {}
645             for ob in all_projects:
646                 objects[ob['uuid']] = ob
647
648             roots = []
649             root_owners = {}
650             for ob in all_projects:
651                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
652                     roots.append(ob)
653                     root_owners[ob['owner_uuid']] = True
654
655             lusers = arvados.util.list_all(
656                 self.api.users().list, self.num_retries,
657                 filters=[['uuid','in', list(root_owners)]])
658             lgroups = arvados.util.list_all(
659                 self.api.groups().list, self.num_retries,
660                 filters=[['uuid','in', list(root_owners)]])
661
662             users = {}
663             groups = {}
664
665             for l in lusers:
666                 objects[l["uuid"]] = l
667             for l in lgroups:
668                 objects[l["uuid"]] = l
669
670             contents = {}
671             for r in root_owners:
672                 if r in objects:
673                     obr = objects[r]
674                     if "name" in obr:
675                         contents[obr["name"]] = obr
676                     if "first_name" in obr:
677                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
678
679             for r in roots:
680                 if r['owner_uuid'] not in objects:
681                     contents[r['name']] = r
682
683         # end with llfuse.lock_released, re-acquire lock
684
685         try:
686             self.merge(contents.items(),
687                        lambda i: i[0],
688                        lambda a, i: a.uuid == i[1]['uuid'],
689                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
690         except Exception:
691             _logger.exception()
692
693
694 class FileHandle(object):
695     '''Connects a numeric file handle to a File or Directory object that has
696     been opened by the client.'''
697
698     def __init__(self, fh, entry):
699         self.fh = fh
700         self.entry = entry
701
702
703 class Inodes(object):
704     '''Manage the set of inodes.  This is the mapping from a numeric id
705     to a concrete File or Directory object'''
706
707     def __init__(self):
708         self._entries = {}
709         self._counter = itertools.count(llfuse.ROOT_INODE)
710
711     def __getitem__(self, item):
712         return self._entries[item]
713
714     def __setitem__(self, key, item):
715         self._entries[key] = item
716
717     def __iter__(self):
718         return self._entries.iterkeys()
719
720     def items(self):
721         return self._entries.items()
722
723     def __contains__(self, k):
724         return k in self._entries
725
726     def add_entry(self, entry):
727         entry.inode = next(self._counter)
728         self._entries[entry.inode] = entry
729         return entry
730
731     def del_entry(self, entry):
732         llfuse.invalidate_inode(entry.inode)
733         del self._entries[entry.inode]
734
735 class Operations(llfuse.Operations):
736     '''This is the main interface with llfuse.  The methods on this object are
737     called by llfuse threads to service FUSE events to query and read from
738     the file system.
739
740     llfuse has its own global lock which is acquired before calling a request handler,
741     so request handlers do not run concurrently unless the lock is explicitly released
742     using "with llfuse.lock_released:"'''
743
744     def __init__(self, uid, gid, encoding="utf-8"):
745         super(Operations, self).__init__()
746
747         self.inodes = Inodes()
748         self.uid = uid
749         self.gid = gid
750         self.encoding = encoding
751
752         # dict of inode to filehandle
753         self._filehandles = {}
754         self._filehandles_counter = 1
755
756         # Other threads that need to wait until the fuse driver
757         # is fully initialized should wait() on this event object.
758         self.initlock = threading.Event()
759
760     def init(self):
761         # Allow threads that are waiting for the driver to be finished
762         # initializing to continue
763         self.initlock.set()
764
765     def access(self, inode, mode, ctx):
766         return True
767
768     def getattr(self, inode):
769         if inode not in self.inodes:
770             raise llfuse.FUSEError(errno.ENOENT)
771
772         e = self.inodes[inode]
773
774         entry = llfuse.EntryAttributes()
775         entry.st_ino = inode
776         entry.generation = 0
777         entry.entry_timeout = 300
778         entry.attr_timeout = 300
779
780         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
781         if isinstance(e, Directory):
782             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
783         elif isinstance(e, StreamReaderFile):
784             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
785         else:
786             entry.st_mode |= stat.S_IFREG
787
788         entry.st_nlink = 1
789         entry.st_uid = self.uid
790         entry.st_gid = self.gid
791         entry.st_rdev = 0
792
793         entry.st_size = e.size()
794
795         entry.st_blksize = 512
796         entry.st_blocks = (e.size()/512)+1
797         entry.st_atime = int(e.atime())
798         entry.st_mtime = int(e.mtime())
799         entry.st_ctime = int(e.mtime())
800
801         return entry
802
803     def lookup(self, parent_inode, name):
804         name = unicode(name, self.encoding)
805         _logger.debug("arv-mount lookup: parent_inode %i name %s",
806                       parent_inode, name)
807         inode = None
808
809         if name == '.':
810             inode = parent_inode
811         else:
812             if parent_inode in self.inodes:
813                 p = self.inodes[parent_inode]
814                 if name == '..':
815                     inode = p.parent_inode
816                 elif isinstance(p, Directory) and name in p:
817                     inode = p[name].inode
818
819         if inode != None:
820             return self.getattr(inode)
821         else:
822             raise llfuse.FUSEError(errno.ENOENT)
823
824     def open(self, inode, flags):
825         if inode in self.inodes:
826             p = self.inodes[inode]
827         else:
828             raise llfuse.FUSEError(errno.ENOENT)
829
830         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
831             raise llfuse.FUSEError(errno.EROFS)
832
833         if isinstance(p, Directory):
834             raise llfuse.FUSEError(errno.EISDIR)
835
836         fh = self._filehandles_counter
837         self._filehandles_counter += 1
838         self._filehandles[fh] = FileHandle(fh, p)
839         return fh
840
841     def read(self, fh, off, size):
842         _logger.debug("arv-mount read %i %i %i", fh, off, size)
843         if fh in self._filehandles:
844             handle = self._filehandles[fh]
845         else:
846             raise llfuse.FUSEError(errno.EBADF)
847
848         # update atime
849         handle.entry._atime = time.time()
850
851         try:
852             with llfuse.lock_released:
853                 return handle.entry.readfrom(off, size)
854         except arvados.errors.NotFoundError as e:
855             _logger.warning("Block not found: " + str(e))
856             raise llfuse.FUSEError(errno.EIO)
857         except Exception:
858             _logger.exception()
859             raise llfuse.FUSEError(errno.EIO)
860
861     def release(self, fh):
862         if fh in self._filehandles:
863             del self._filehandles[fh]
864
865     def opendir(self, inode):
866         _logger.debug("arv-mount opendir: inode %i", inode)
867
868         if inode in self.inodes:
869             p = self.inodes[inode]
870         else:
871             raise llfuse.FUSEError(errno.ENOENT)
872
873         if not isinstance(p, Directory):
874             raise llfuse.FUSEError(errno.ENOTDIR)
875
876         fh = self._filehandles_counter
877         self._filehandles_counter += 1
878         if p.parent_inode in self.inodes:
879             parent = self.inodes[p.parent_inode]
880         else:
881             raise llfuse.FUSEError(errno.EIO)
882
883         # update atime
884         p._atime = time.time()
885
886         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
887         return fh
888
889     def readdir(self, fh, off):
890         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
891
892         if fh in self._filehandles:
893             handle = self._filehandles[fh]
894         else:
895             raise llfuse.FUSEError(errno.EBADF)
896
897         _logger.debug("arv-mount handle.entry %s", handle.entry)
898
899         e = off
900         while e < len(handle.entry):
901             if handle.entry[e][1].inode in self.inodes:
902                 try:
903                     yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
904                 except UnicodeEncodeError:
905                     pass
906             e += 1
907
908     def releasedir(self, fh):
909         del self._filehandles[fh]
910
911     def statfs(self):
912         st = llfuse.StatvfsData()
913         st.f_bsize = 64 * 1024
914         st.f_blocks = 0
915         st.f_files = 0
916
917         st.f_bfree = 0
918         st.f_bavail = 0
919
920         st.f_ffree = 0
921         st.f_favail = 0
922
923         st.f_frsize = 0
924         return st
925
926     # The llfuse documentation recommends only overloading functions that
927     # are actually implemented, as the default implementation will raise ENOSYS.
928     # However, there is a bug in the llfuse default implementation of create()
929     # "create() takes exactly 5 positional arguments (6 given)" which will crash
930     # arv-mount.
931     # The workaround is to implement it with the proper number of parameters,
932     # and then everything works out.
933     def create(self, p1, p2, p3, p4, p5):
934         raise llfuse.FUSEError(errno.EROFS)