5095: Don't check update when getting mtime().
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
25
26 _logger = logging.getLogger('arvados.arvados_fuse')
27
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
32
33 class SafeApi(object):
34     """Threadsafe wrapper for API object.
35
36     This stores and returns a different api object per thread, because
37     httplib2 which underlies apiclient is not threadsafe.
38     """
39
40     def __init__(self, config):
41         self.host = config.get('ARVADOS_API_HOST')
42         self.api_token = config.get('ARVADOS_API_TOKEN')
43         self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
44         self.local = threading.local()
45         self.block_cache = arvados.KeepBlockCache()
46
47     def localapi(self):
48         if 'api' not in self.local.__dict__:
49             self.local.api = arvados.api(
50                 version='v1',
51                 host=self.host, token=self.api_token, insecure=self.insecure)
52         return self.local.api
53
54     def localkeep(self):
55         if 'keep' not in self.local.__dict__:
56             self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
57         return self.local.keep
58
59     def __getattr__(self, name):
60         # Proxy nonexistent attributes to the local API client.
61         try:
62             return getattr(self.localapi(), name)
63         except AttributeError:
64             return super(SafeApi, self).__getattr__(name)
65
66
67 def convertTime(t):
68     '''Parse Arvados timestamp to unix time.'''
69     try:
70         return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
71     except (TypeError, ValueError):
72         return 0
73
74 def sanitize_filename(dirty):
75     '''Replace disallowed filename characters with harmless "_".'''
76     if dirty is None:
77         return None
78     elif dirty == '':
79         return '_'
80     elif dirty == '.':
81         return '_'
82     elif dirty == '..':
83         return '__'
84     else:
85         return _disallowed_filename_characters.sub('_', dirty)
86
87
88 class FreshBase(object):
89     '''Base class for maintaining fresh/stale state to determine when to update.'''
90     def __init__(self):
91         self._stale = True
92         self._poll = False
93         self._last_update = time.time()
94         self._atime = time.time()
95         self._poll_time = 60
96
97     # Mark the value as stale
98     def invalidate(self):
99         self._stale = True
100
101     # Test if the entries dict is stale.
102     def stale(self):
103         if self._stale:
104             return True
105         if self._poll:
106             return (self._last_update + self._poll_time) < self._atime
107         return False
108
109     def fresh(self):
110         self._stale = False
111         self._last_update = time.time()
112
113     def atime(self):
114         return self._atime
115
116 class File(FreshBase):
117     '''Base for file objects.'''
118
119     def __init__(self, parent_inode, _mtime=0):
120         super(File, self).__init__()
121         self.inode = None
122         self.parent_inode = parent_inode
123         self._mtime = _mtime
124
125     def size(self):
126         return 0
127
128     def readfrom(self, off, size):
129         return ''
130
131     def mtime(self):
132         return self._mtime
133
134
135 class StreamReaderFile(File):
136     '''Wraps a StreamFileReader as a file.'''
137
138     def __init__(self, parent_inode, reader, _mtime):
139         super(StreamReaderFile, self).__init__(parent_inode, _mtime)
140         self.reader = reader
141
142     def size(self):
143         return self.reader.size()
144
145     def readfrom(self, off, size):
146         return self.reader.readfrom(off, size)
147
148     def stale(self):
149         return False
150
151
152 class StringFile(File):
153     '''Wrap a simple string as a file'''
154     def __init__(self, parent_inode, contents, _mtime):
155         super(StringFile, self).__init__(parent_inode, _mtime)
156         self.contents = contents
157
158     def size(self):
159         return len(self.contents)
160
161     def readfrom(self, off, size):
162         return self.contents[off:(off+size)]
163
164
165 class ObjectFile(StringFile):
166     '''Wrap a dict as a serialized json object.'''
167
168     def __init__(self, parent_inode, obj):
169         super(ObjectFile, self).__init__(parent_inode, "", 0)
170         self.uuid = obj['uuid']
171         self.update(obj)
172
173     def update(self, obj):
174         self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
175         self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
176
177
178 class Directory(FreshBase):
179     '''Generic directory object, backed by a dict.
180     Consists of a set of entries with the key representing the filename
181     and the value referencing a File or Directory object.
182     '''
183
184     def __init__(self, parent_inode):
185         super(Directory, self).__init__()
186
187         '''parent_inode is the integer inode number'''
188         self.inode = None
189         if not isinstance(parent_inode, int):
190             raise Exception("parent_inode should be an int")
191         self.parent_inode = parent_inode
192         self._entries = {}
193         self._mtime = time.time()
194
195     #  Overriden by subclasses to implement logic to update the entries dict
196     #  when the directory is stale
197     def update(self):
198         pass
199
200     # Only used when computing the size of the disk footprint of the directory
201     # (stub)
202     def size(self):
203         return 0
204
205     def checkupdate(self):
206         if self.stale():
207             try:
208                 self.update()
209             except apiclient.errors.HttpError as e:
210                 _logger.debug(e)
211
212     def __getitem__(self, item):
213         self.checkupdate()
214         return self._entries[item]
215
216     def items(self):
217         self.checkupdate()
218         return self._entries.items()
219
220     def __iter__(self):
221         self.checkupdate()
222         return self._entries.iterkeys()
223
224     def __contains__(self, k):
225         self.checkupdate()
226         return k in self._entries
227
228     def merge(self, items, fn, same, new_entry):
229         '''Helper method for updating the contents of the directory.  Takes a list
230         describing the new contents of the directory, reuse entries that are
231         the same in both the old and new lists, create new entries, and delete
232         old entries missing from the new list.
233
234         items: iterable with new directory contents
235
236         fn: function to take an entry in 'items' and return the desired file or
237         directory name, or None if this entry should be skipped
238
239         same: function to compare an existing entry (a File or Directory
240         object) with an entry in the items list to determine whether to keep
241         the existing entry.
242
243         new_entry: function to create a new directory entry (File or Directory
244         object) from an entry in the items list.
245
246         '''
247
248         oldentries = self._entries
249         self._entries = {}
250         changed = False
251         for i in items:
252             name = sanitize_filename(fn(i))
253             if name:
254                 if name in oldentries and same(oldentries[name], i):
255                     # move existing directory entry over
256                     self._entries[name] = oldentries[name]
257                     del oldentries[name]
258                 else:
259                     # create new directory entry
260                     ent = new_entry(i)
261                     if ent is not None:
262                         self._entries[name] = self.inodes.add_entry(ent)
263                         changed = True
264
265         # delete any other directory entries that were not in found in 'items'
266         for i in oldentries:
267             llfuse.invalidate_entry(self.inode, str(i))
268             self.inodes.del_entry(oldentries[i])
269             changed = True
270
271         if changed:
272             self._mtime = time.time()
273
274         self.fresh()
275
276     def clear(self):
277         '''Delete all entries'''
278         oldentries = self._entries
279         self._entries = {}
280         for n in oldentries:
281             if isinstance(n, Directory):
282                 n.clear()
283             llfuse.invalidate_entry(self.inode, str(n))
284             self.inodes.del_entry(oldentries[n])
285         llfuse.invalidate_inode(self.inode)
286         self.invalidate()
287
288     def mtime(self):
289         return self._mtime
290
291
292 class CollectionDirectory(Directory):
293     '''Represents the root of a directory tree holding a collection.'''
294
295     def __init__(self, parent_inode, inodes, api, num_retries, collection):
296         super(CollectionDirectory, self).__init__(parent_inode)
297         self.inodes = inodes
298         self.api = api
299         self.num_retries = num_retries
300         self.collection_object_file = None
301         self.collection_object = None
302         if isinstance(collection, dict):
303             self.collection_locator = collection['uuid']
304         else:
305             self.collection_locator = collection
306
307     def same(self, i):
308         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
309
310     # Used by arv-web.py to switch the contents of the CollectionDirectory
311     def change_collection(self, new_locator):
312         """Switch the contents of the CollectionDirectory.  Must be called with llfuse.lock held."""
313         self.collection_locator = new_locator
314         self.collection_object = None
315         self.update()
316
317     def new_collection(self, new_collection_object, coll_reader):
318         self.collection_object = new_collection_object
319
320         if self.collection_object_file is not None:
321             self.collection_object_file.update(self.collection_object)
322
323         self.clear()
324         for s in coll_reader.all_streams():
325             cwd = self
326             for part in s.name().split('/'):
327                 if part != '' and part != '.':
328                     partname = sanitize_filename(part)
329                     if partname not in cwd._entries:
330                         cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
331                     cwd = cwd._entries[partname]
332             for k, v in s.files().items():
333                 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
334
335     def update(self):
336         try:
337             if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
338                 return True
339
340             if self.collection_locator is None:
341                 self.fresh()
342                 return True
343
344             with llfuse.lock_released:
345                 coll_reader = arvados.CollectionReader(
346                     self.collection_locator, self.api, self.api.localkeep(),
347                     num_retries=self.num_retries)
348                 new_collection_object = coll_reader.api_response() or {}
349                 # If the Collection only exists in Keep, there will be no API
350                 # response.  Fill in the fields we need.
351                 if 'uuid' not in new_collection_object:
352                     new_collection_object['uuid'] = self.collection_locator
353                 if "portable_data_hash" not in new_collection_object:
354                     new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
355                 if 'manifest_text' not in new_collection_object:
356                     new_collection_object['manifest_text'] = coll_reader.manifest_text()
357                 coll_reader.normalize()
358             # end with llfuse.lock_released, re-acquire lock
359
360             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
361                 self.new_collection(new_collection_object, coll_reader)
362
363             self.fresh()
364             return True
365         except arvados.errors.NotFoundError:
366             _logger.exception("arv-mount %s: error", self.collection_locator)
367         except arvados.errors.ArgumentError as detail:
368             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
369             if self.collection_object is not None and "manifest_text" in self.collection_object:
370                 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
371         except Exception:
372             _logger.exception("arv-mount %s: error", self.collection_locator)
373             if self.collection_object is not None and "manifest_text" in self.collection_object:
374                 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
375         return False
376
377     def __getitem__(self, item):
378         self.checkupdate()
379         if item == '.arvados#collection':
380             if self.collection_object_file is None:
381                 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
382                 self.inodes.add_entry(self.collection_object_file)
383             return self.collection_object_file
384         else:
385             return super(CollectionDirectory, self).__getitem__(item)
386
387     def __contains__(self, k):
388         if k == '.arvados#collection':
389             return True
390         else:
391             return super(CollectionDirectory, self).__contains__(k)
392
393     def mtime(self):
394         return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
395
396
397 class MagicDirectory(Directory):
398     '''A special directory that logically contains the set of all extant keep
399     locators.  When a file is referenced by lookup(), it is tested to see if it
400     is a valid keep locator to a manifest, and if so, loads the manifest
401     contents as a subdirectory of this directory with the locator as the
402     directory name.  Since querying a list of all extant keep locators is
403     impractical, only collections that have already been accessed are visible
404     to readdir().
405     '''
406
407     README_TEXT = '''
408 This directory provides access to Arvados collections as subdirectories listed
409 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
410 the form '1234567890abcdefghijklmnopqrstuv+123').
411
412 Note that this directory will appear empty until you attempt to access a
413 specific collection subdirectory (such as trying to 'cd' into it), at which
414 point the collection will actually be looked up on the server and the directory
415 will appear if it exists.
416 '''.lstrip()
417
418     def __init__(self, parent_inode, inodes, api, num_retries):
419         super(MagicDirectory, self).__init__(parent_inode)
420         self.inodes = inodes
421         self.api = api
422         self.num_retries = num_retries
423
424     def __setattr__(self, name, value):
425         super(MagicDirectory, self).__setattr__(name, value)
426         # When we're assigned an inode, add a README.
427         if ((name == 'inode') and (self.inode is not None) and
428               (not self._entries)):
429             self._entries['README'] = self.inodes.add_entry(
430                 StringFile(self.inode, self.README_TEXT, time.time()))
431             # If we're the root directory, add an identical by_id subdirectory.
432             if self.inode == llfuse.ROOT_INODE:
433                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
434                         self.inode, self.inodes, self.api, self.num_retries))
435
436     def __contains__(self, k):
437         if k in self._entries:
438             return True
439
440         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
441             return False
442
443         try:
444             e = self.inodes.add_entry(CollectionDirectory(
445                     self.inode, self.inodes, self.api, self.num_retries, k))
446             if e.update():
447                 self._entries[k] = e
448                 return True
449             else:
450                 return False
451         except Exception as e:
452             _logger.debug('arv-mount exception keep %s', e)
453             return False
454
455     def __getitem__(self, item):
456         if item in self:
457             return self._entries[item]
458         else:
459             raise KeyError("No collection with id " + item)
460
461
462 class RecursiveInvalidateDirectory(Directory):
463     def invalidate(self):
464         if self.inode == llfuse.ROOT_INODE:
465             llfuse.lock.acquire()
466         try:
467             super(RecursiveInvalidateDirectory, self).invalidate()
468             for a in self._entries:
469                 self._entries[a].invalidate()
470         except Exception:
471             _logger.exception()
472         finally:
473             if self.inode == llfuse.ROOT_INODE:
474                 llfuse.lock.release()
475
476
477 class TagsDirectory(RecursiveInvalidateDirectory):
478     '''A special directory that contains as subdirectories all tags visible to the user.'''
479
480     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
481         super(TagsDirectory, self).__init__(parent_inode)
482         self.inodes = inodes
483         self.api = api
484         self.num_retries = num_retries
485         self._poll = True
486         self._poll_time = poll_time
487
488     def update(self):
489         with llfuse.lock_released:
490             tags = self.api.links().list(
491                 filters=[['link_class', '=', 'tag']],
492                 select=['name'], distinct=True
493                 ).execute(num_retries=self.num_retries)
494         if "items" in tags:
495             self.merge(tags['items'],
496                        lambda i: i['name'],
497                        lambda a, i: a.tag == i['name'],
498                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
499
500
501 class TagDirectory(Directory):
502     '''A special directory that contains as subdirectories all collections visible
503     to the user that are tagged with a particular tag.
504     '''
505
506     def __init__(self, parent_inode, inodes, api, num_retries, tag,
507                  poll=False, poll_time=60):
508         super(TagDirectory, self).__init__(parent_inode)
509         self.inodes = inodes
510         self.api = api
511         self.num_retries = num_retries
512         self.tag = tag
513         self._poll = poll
514         self._poll_time = poll_time
515
516     def update(self):
517         with llfuse.lock_released:
518             taggedcollections = self.api.links().list(
519                 filters=[['link_class', '=', 'tag'],
520                          ['name', '=', self.tag],
521                          ['head_uuid', 'is_a', 'arvados#collection']],
522                 select=['head_uuid']
523                 ).execute(num_retries=self.num_retries)
524         self.merge(taggedcollections['items'],
525                    lambda i: i['head_uuid'],
526                    lambda a, i: a.collection_locator == i['head_uuid'],
527                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
528
529
530 class ProjectDirectory(Directory):
531     '''A special directory that contains the contents of a project.'''
532
533     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
534                  poll=False, poll_time=60):
535         super(ProjectDirectory, self).__init__(parent_inode)
536         self.inodes = inodes
537         self.api = api
538         self.num_retries = num_retries
539         self.project_object = project_object
540         self.project_object_file = None
541         self.uuid = project_object['uuid']
542         self._poll = poll
543         self._poll_time = poll_time
544
545     def createDirectory(self, i):
546         if collection_uuid_pattern.match(i['uuid']):
547             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
548         elif group_uuid_pattern.match(i['uuid']):
549             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
550         elif link_uuid_pattern.match(i['uuid']):
551             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
552                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
553             else:
554                 return None
555         elif uuid_pattern.match(i['uuid']):
556             return ObjectFile(self.parent_inode, i)
557         else:
558             return None
559
560     def update(self):
561         if self.project_object_file == None:
562             self.project_object_file = ObjectFile(self.inode, self.project_object)
563             self.inodes.add_entry(self.project_object_file)
564
565         def namefn(i):
566             if 'name' in i:
567                 if i['name'] is None or len(i['name']) == 0:
568                     return None
569                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
570                     # collection or subproject
571                     return i['name']
572                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
573                     # name link
574                     return i['name']
575                 elif 'kind' in i and i['kind'].startswith('arvados#'):
576                     # something else
577                     return "{}.{}".format(i['name'], i['kind'][8:])
578             else:
579                 return None
580
581         def samefn(a, i):
582             if isinstance(a, CollectionDirectory):
583                 return a.collection_locator == i['uuid']
584             elif isinstance(a, ProjectDirectory):
585                 return a.uuid == i['uuid']
586             elif isinstance(a, ObjectFile):
587                 return a.uuid == i['uuid'] and not a.stale()
588             return False
589
590         with llfuse.lock_released:
591             if group_uuid_pattern.match(self.uuid):
592                 self.project_object = self.api.groups().get(
593                     uuid=self.uuid).execute(num_retries=self.num_retries)
594             elif user_uuid_pattern.match(self.uuid):
595                 self.project_object = self.api.users().get(
596                     uuid=self.uuid).execute(num_retries=self.num_retries)
597
598             contents = arvados.util.list_all(self.api.groups().contents,
599                                              self.num_retries, uuid=self.uuid)
600             # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
601             contents += arvados.util.list_all(
602                 self.api.links().list, self.num_retries,
603                 filters=[['tail_uuid', '=', self.uuid],
604                          ['link_class', '=', 'name']])
605
606         # end with llfuse.lock_released, re-acquire lock
607
608         self.merge(contents,
609                    namefn,
610                    samefn,
611                    self.createDirectory)
612
613     def __getitem__(self, item):
614         self.checkupdate()
615         if item == '.arvados#project':
616             return self.project_object_file
617         else:
618             return super(ProjectDirectory, self).__getitem__(item)
619
620     def __contains__(self, k):
621         if k == '.arvados#project':
622             return True
623         else:
624             return super(ProjectDirectory, self).__contains__(k)
625
626
627 class SharedDirectory(Directory):
628     '''A special directory that represents users or groups who have shared projects with me.'''
629
630     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
631                  poll=False, poll_time=60):
632         super(SharedDirectory, self).__init__(parent_inode)
633         self.inodes = inodes
634         self.api = api
635         self.num_retries = num_retries
636         self.current_user = api.users().current().execute(num_retries=num_retries)
637         self._poll = True
638         self._poll_time = poll_time
639
640     def update(self):
641         with llfuse.lock_released:
642             all_projects = arvados.util.list_all(
643                 self.api.groups().list, self.num_retries,
644                 filters=[['group_class','=','project']])
645             objects = {}
646             for ob in all_projects:
647                 objects[ob['uuid']] = ob
648
649             roots = []
650             root_owners = {}
651             for ob in all_projects:
652                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
653                     roots.append(ob)
654                     root_owners[ob['owner_uuid']] = True
655
656             lusers = arvados.util.list_all(
657                 self.api.users().list, self.num_retries,
658                 filters=[['uuid','in', list(root_owners)]])
659             lgroups = arvados.util.list_all(
660                 self.api.groups().list, self.num_retries,
661                 filters=[['uuid','in', list(root_owners)]])
662
663             users = {}
664             groups = {}
665
666             for l in lusers:
667                 objects[l["uuid"]] = l
668             for l in lgroups:
669                 objects[l["uuid"]] = l
670
671             contents = {}
672             for r in root_owners:
673                 if r in objects:
674                     obr = objects[r]
675                     if "name" in obr:
676                         contents[obr["name"]] = obr
677                     if "first_name" in obr:
678                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
679
680             for r in roots:
681                 if r['owner_uuid'] not in objects:
682                     contents[r['name']] = r
683
684         # end with llfuse.lock_released, re-acquire lock
685
686         try:
687             self.merge(contents.items(),
688                        lambda i: i[0],
689                        lambda a, i: a.uuid == i[1]['uuid'],
690                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
691         except Exception:
692             _logger.exception()
693
694
695 class FileHandle(object):
696     '''Connects a numeric file handle to a File or Directory object that has
697     been opened by the client.'''
698
699     def __init__(self, fh, entry):
700         self.fh = fh
701         self.entry = entry
702
703
704 class Inodes(object):
705     '''Manage the set of inodes.  This is the mapping from a numeric id
706     to a concrete File or Directory object'''
707
708     def __init__(self):
709         self._entries = {}
710         self._counter = itertools.count(llfuse.ROOT_INODE)
711
712     def __getitem__(self, item):
713         return self._entries[item]
714
715     def __setitem__(self, key, item):
716         self._entries[key] = item
717
718     def __iter__(self):
719         return self._entries.iterkeys()
720
721     def items(self):
722         return self._entries.items()
723
724     def __contains__(self, k):
725         return k in self._entries
726
727     def add_entry(self, entry):
728         entry.inode = next(self._counter)
729         self._entries[entry.inode] = entry
730         return entry
731
732     def del_entry(self, entry):
733         llfuse.invalidate_inode(entry.inode)
734         del self._entries[entry.inode]
735
736 class Operations(llfuse.Operations):
737     '''This is the main interface with llfuse.  The methods on this object are
738     called by llfuse threads to service FUSE events to query and read from
739     the file system.
740
741     llfuse has its own global lock which is acquired before calling a request handler,
742     so request handlers do not run concurrently unless the lock is explicitly released
743     using "with llfuse.lock_released:"'''
744
745     def __init__(self, uid, gid, encoding="utf-8"):
746         super(Operations, self).__init__()
747
748         self.inodes = Inodes()
749         self.uid = uid
750         self.gid = gid
751         self.encoding = encoding
752
753         # dict of inode to filehandle
754         self._filehandles = {}
755         self._filehandles_counter = 1
756
757         # Other threads that need to wait until the fuse driver
758         # is fully initialized should wait() on this event object.
759         self.initlock = threading.Event()
760
761     def init(self):
762         # Allow threads that are waiting for the driver to be finished
763         # initializing to continue
764         self.initlock.set()
765
766     def access(self, inode, mode, ctx):
767         return True
768
769     def getattr(self, inode):
770         if inode not in self.inodes:
771             raise llfuse.FUSEError(errno.ENOENT)
772
773         e = self.inodes[inode]
774
775         entry = llfuse.EntryAttributes()
776         entry.st_ino = inode
777         entry.generation = 0
778         entry.entry_timeout = 300
779         entry.attr_timeout = 300
780
781         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
782         if isinstance(e, Directory):
783             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
784         elif isinstance(e, StreamReaderFile):
785             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
786         else:
787             entry.st_mode |= stat.S_IFREG
788
789         entry.st_nlink = 1
790         entry.st_uid = self.uid
791         entry.st_gid = self.gid
792         entry.st_rdev = 0
793
794         entry.st_size = e.size()
795
796         entry.st_blksize = 512
797         entry.st_blocks = (e.size()/512)+1
798         entry.st_atime = int(e.atime())
799         entry.st_mtime = int(e.mtime())
800         entry.st_ctime = int(e.mtime())
801
802         return entry
803
804     def lookup(self, parent_inode, name):
805         name = unicode(name, self.encoding)
806         _logger.debug("arv-mount lookup: parent_inode %i name %s",
807                       parent_inode, name)
808         inode = None
809
810         if name == '.':
811             inode = parent_inode
812         else:
813             if parent_inode in self.inodes:
814                 p = self.inodes[parent_inode]
815                 if name == '..':
816                     inode = p.parent_inode
817                 elif isinstance(p, Directory) and name in p:
818                     inode = p[name].inode
819
820         if inode != None:
821             return self.getattr(inode)
822         else:
823             raise llfuse.FUSEError(errno.ENOENT)
824
825     def open(self, inode, flags):
826         if inode in self.inodes:
827             p = self.inodes[inode]
828         else:
829             raise llfuse.FUSEError(errno.ENOENT)
830
831         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
832             raise llfuse.FUSEError(errno.EROFS)
833
834         if isinstance(p, Directory):
835             raise llfuse.FUSEError(errno.EISDIR)
836
837         fh = self._filehandles_counter
838         self._filehandles_counter += 1
839         self._filehandles[fh] = FileHandle(fh, p)
840         return fh
841
842     def read(self, fh, off, size):
843         _logger.debug("arv-mount read %i %i %i", fh, off, size)
844         if fh in self._filehandles:
845             handle = self._filehandles[fh]
846         else:
847             raise llfuse.FUSEError(errno.EBADF)
848
849         # update atime
850         handle.entry._atime = time.time()
851
852         try:
853             with llfuse.lock_released:
854                 return handle.entry.readfrom(off, size)
855         except arvados.errors.NotFoundError as e:
856             _logger.warning("Block not found: " + str(e))
857             raise llfuse.FUSEError(errno.EIO)
858         except Exception:
859             _logger.exception()
860             raise llfuse.FUSEError(errno.EIO)
861
862     def release(self, fh):
863         if fh in self._filehandles:
864             del self._filehandles[fh]
865
866     def opendir(self, inode):
867         _logger.debug("arv-mount opendir: inode %i", inode)
868
869         if inode in self.inodes:
870             p = self.inodes[inode]
871         else:
872             raise llfuse.FUSEError(errno.ENOENT)
873
874         if not isinstance(p, Directory):
875             raise llfuse.FUSEError(errno.ENOTDIR)
876
877         fh = self._filehandles_counter
878         self._filehandles_counter += 1
879         if p.parent_inode in self.inodes:
880             parent = self.inodes[p.parent_inode]
881         else:
882             raise llfuse.FUSEError(errno.EIO)
883
884         # update atime
885         p._atime = time.time()
886
887         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
888         return fh
889
890     def readdir(self, fh, off):
891         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
892
893         if fh in self._filehandles:
894             handle = self._filehandles[fh]
895         else:
896             raise llfuse.FUSEError(errno.EBADF)
897
898         _logger.debug("arv-mount handle.entry %s", handle.entry)
899
900         e = off
901         while e < len(handle.entry):
902             if handle.entry[e][1].inode in self.inodes:
903                 try:
904                     yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
905                 except UnicodeEncodeError:
906                     pass
907             e += 1
908
909     def releasedir(self, fh):
910         del self._filehandles[fh]
911
912     def statfs(self):
913         st = llfuse.StatvfsData()
914         st.f_bsize = 64 * 1024
915         st.f_blocks = 0
916         st.f_files = 0
917
918         st.f_bfree = 0
919         st.f_bavail = 0
920
921         st.f_ffree = 0
922         st.f_favail = 0
923
924         st.f_frsize = 0
925         return st
926
927     # The llfuse documentation recommends only overloading functions that
928     # are actually implemented, as the default implementation will raise ENOSYS.
929     # However, there is a bug in the llfuse default implementation of create()
930     # "create() takes exactly 5 positional arguments (6 given)" which will crash
931     # arv-mount.
932     # The workaround is to implement it with the proper number of parameters,
933     # and then everything works out.
934     def create(self, p1, p2, p3, p4, p5):
935         raise llfuse.FUSEError(errno.EROFS)