5095: Set _mtime when the collection object is updated.
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
25
26 _logger = logging.getLogger('arvados.arvados_fuse')
27
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
32
33 class SafeApi(object):
34     """Threadsafe wrapper for API object.
35
36     This stores and returns a different api object per thread, because
37     httplib2 which underlies apiclient is not threadsafe.
38     """
39
40     def __init__(self, config):
41         self.host = config.get('ARVADOS_API_HOST')
42         self.api_token = config.get('ARVADOS_API_TOKEN')
43         self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
44         self.local = threading.local()
45         self.block_cache = arvados.KeepBlockCache()
46
47     def localapi(self):
48         if 'api' not in self.local.__dict__:
49             self.local.api = arvados.api(
50                 version='v1',
51                 host=self.host, token=self.api_token, insecure=self.insecure)
52         return self.local.api
53
54     def localkeep(self):
55         if 'keep' not in self.local.__dict__:
56             self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
57         return self.local.keep
58
59     def __getattr__(self, name):
60         # Proxy nonexistent attributes to the local API client.
61         try:
62             return getattr(self.localapi(), name)
63         except AttributeError:
64             return super(SafeApi, self).__getattr__(name)
65
66
67 def convertTime(t):
68     """Parse Arvados timestamp to unix time."""
69     if not t:
70         return 0
71     try:
72         return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
73     except (TypeError, ValueError):
74         return 0
75
76 def sanitize_filename(dirty):
77     '''Replace disallowed filename characters with harmless "_".'''
78     if dirty is None:
79         return None
80     elif dirty == '':
81         return '_'
82     elif dirty == '.':
83         return '_'
84     elif dirty == '..':
85         return '__'
86     else:
87         return _disallowed_filename_characters.sub('_', dirty)
88
89
90 class FreshBase(object):
91     '''Base class for maintaining fresh/stale state to determine when to update.'''
92     def __init__(self):
93         self._stale = True
94         self._poll = False
95         self._last_update = time.time()
96         self._atime = time.time()
97         self._poll_time = 60
98
99     # Mark the value as stale
100     def invalidate(self):
101         self._stale = True
102
103     # Test if the entries dict is stale.
104     def stale(self):
105         if self._stale:
106             return True
107         if self._poll:
108             return (self._last_update + self._poll_time) < self._atime
109         return False
110
111     def fresh(self):
112         self._stale = False
113         self._last_update = time.time()
114
115     def atime(self):
116         return self._atime
117
118 class File(FreshBase):
119     '''Base for file objects.'''
120
121     def __init__(self, parent_inode, _mtime=0):
122         super(File, self).__init__()
123         self.inode = None
124         self.parent_inode = parent_inode
125         self._mtime = _mtime
126
127     def size(self):
128         return 0
129
130     def readfrom(self, off, size):
131         return ''
132
133     def mtime(self):
134         return self._mtime
135
136
137 class StreamReaderFile(File):
138     '''Wraps a StreamFileReader as a file.'''
139
140     def __init__(self, parent_inode, reader, _mtime):
141         super(StreamReaderFile, self).__init__(parent_inode, _mtime)
142         self.reader = reader
143
144     def size(self):
145         return self.reader.size()
146
147     def readfrom(self, off, size):
148         return self.reader.readfrom(off, size)
149
150     def stale(self):
151         return False
152
153
154 class StringFile(File):
155     '''Wrap a simple string as a file'''
156     def __init__(self, parent_inode, contents, _mtime):
157         super(StringFile, self).__init__(parent_inode, _mtime)
158         self.contents = contents
159
160     def size(self):
161         return len(self.contents)
162
163     def readfrom(self, off, size):
164         return self.contents[off:(off+size)]
165
166
167 class ObjectFile(StringFile):
168     '''Wrap a dict as a serialized json object.'''
169
170     def __init__(self, parent_inode, obj):
171         super(ObjectFile, self).__init__(parent_inode, "", 0)
172         self.uuid = obj['uuid']
173         self.update(obj)
174
175     def update(self, obj):
176         self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
177         self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
178
179
180 class Directory(FreshBase):
181     '''Generic directory object, backed by a dict.
182     Consists of a set of entries with the key representing the filename
183     and the value referencing a File or Directory object.
184     '''
185
186     def __init__(self, parent_inode):
187         super(Directory, self).__init__()
188
189         '''parent_inode is the integer inode number'''
190         self.inode = None
191         if not isinstance(parent_inode, int):
192             raise Exception("parent_inode should be an int")
193         self.parent_inode = parent_inode
194         self._entries = {}
195         self._mtime = time.time()
196
197     #  Overriden by subclasses to implement logic to update the entries dict
198     #  when the directory is stale
199     def update(self):
200         pass
201
202     # Only used when computing the size of the disk footprint of the directory
203     # (stub)
204     def size(self):
205         return 0
206
207     def checkupdate(self):
208         if self.stale():
209             try:
210                 self.update()
211             except apiclient.errors.HttpError as e:
212                 _logger.debug(e)
213
214     def __getitem__(self, item):
215         self.checkupdate()
216         return self._entries[item]
217
218     def items(self):
219         self.checkupdate()
220         return self._entries.items()
221
222     def __iter__(self):
223         self.checkupdate()
224         return self._entries.iterkeys()
225
226     def __contains__(self, k):
227         self.checkupdate()
228         return k in self._entries
229
230     def merge(self, items, fn, same, new_entry):
231         '''Helper method for updating the contents of the directory.  Takes a list
232         describing the new contents of the directory, reuse entries that are
233         the same in both the old and new lists, create new entries, and delete
234         old entries missing from the new list.
235
236         items: iterable with new directory contents
237
238         fn: function to take an entry in 'items' and return the desired file or
239         directory name, or None if this entry should be skipped
240
241         same: function to compare an existing entry (a File or Directory
242         object) with an entry in the items list to determine whether to keep
243         the existing entry.
244
245         new_entry: function to create a new directory entry (File or Directory
246         object) from an entry in the items list.
247
248         '''
249
250         oldentries = self._entries
251         self._entries = {}
252         changed = False
253         for i in items:
254             name = sanitize_filename(fn(i))
255             if name:
256                 if name in oldentries and same(oldentries[name], i):
257                     # move existing directory entry over
258                     self._entries[name] = oldentries[name]
259                     del oldentries[name]
260                 else:
261                     # create new directory entry
262                     ent = new_entry(i)
263                     if ent is not None:
264                         self._entries[name] = self.inodes.add_entry(ent)
265                         changed = True
266
267         # delete any other directory entries that were not in found in 'items'
268         for i in oldentries:
269             llfuse.invalidate_entry(self.inode, str(i))
270             self.inodes.del_entry(oldentries[i])
271             changed = True
272
273         if changed:
274             self._mtime = time.time()
275
276         self.fresh()
277
278     def clear(self):
279         '''Delete all entries'''
280         oldentries = self._entries
281         self._entries = {}
282         for n in oldentries:
283             if isinstance(n, Directory):
284                 n.clear()
285             llfuse.invalidate_entry(self.inode, str(n))
286             self.inodes.del_entry(oldentries[n])
287         llfuse.invalidate_inode(self.inode)
288         self.invalidate()
289
290     def mtime(self):
291         return self._mtime
292
293
294 class CollectionDirectory(Directory):
295     '''Represents the root of a directory tree holding a collection.'''
296
297     def __init__(self, parent_inode, inodes, api, num_retries, collection):
298         super(CollectionDirectory, self).__init__(parent_inode)
299         self.inodes = inodes
300         self.api = api
301         self.num_retries = num_retries
302         self.collection_object_file = None
303         self.collection_object = None
304         if isinstance(collection, dict):
305             self.collection_locator = collection['uuid']
306             self._mtime = convertTime(collection.get('modified_at'))
307         else:
308             self.collection_locator = collection
309
310     def same(self, i):
311         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
312
313     # Used by arv-web.py to switch the contents of the CollectionDirectory
314     def change_collection(self, new_locator):
315         """Switch the contents of the CollectionDirectory.  Must be called with llfuse.lock held."""
316         self.collection_locator = new_locator
317         self.collection_object = None
318         self.update()
319
320     def new_collection(self, new_collection_object, coll_reader):
321         self.collection_object = new_collection_object
322
323         self._mtime = convertTime(self.collection_object.get('modified_at'))
324
325         if self.collection_object_file is not None:
326             self.collection_object_file.update(self.collection_object)
327
328         self.clear()
329         for s in coll_reader.all_streams():
330             cwd = self
331             for part in s.name().split('/'):
332                 if part != '' and part != '.':
333                     partname = sanitize_filename(part)
334                     if partname not in cwd._entries:
335                         cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
336                     cwd = cwd._entries[partname]
337             for k, v in s.files().items():
338                 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
339
340     def update(self):
341         try:
342             if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
343                 return True
344
345             if self.collection_locator is None:
346                 self.fresh()
347                 return True
348
349             with llfuse.lock_released:
350                 coll_reader = arvados.CollectionReader(
351                     self.collection_locator, self.api, self.api.localkeep(),
352                     num_retries=self.num_retries)
353                 new_collection_object = coll_reader.api_response() or {}
354                 # If the Collection only exists in Keep, there will be no API
355                 # response.  Fill in the fields we need.
356                 if 'uuid' not in new_collection_object:
357                     new_collection_object['uuid'] = self.collection_locator
358                 if "portable_data_hash" not in new_collection_object:
359                     new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
360                 if 'manifest_text' not in new_collection_object:
361                     new_collection_object['manifest_text'] = coll_reader.manifest_text()
362                 coll_reader.normalize()
363             # end with llfuse.lock_released, re-acquire lock
364
365             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
366                 self.new_collection(new_collection_object, coll_reader)
367
368             self.fresh()
369             return True
370         except arvados.errors.NotFoundError:
371             _logger.exception("arv-mount %s: error", self.collection_locator)
372         except arvados.errors.ArgumentError as detail:
373             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
374             if self.collection_object is not None and "manifest_text" in self.collection_object:
375                 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
376         except Exception:
377             _logger.exception("arv-mount %s: error", self.collection_locator)
378             if self.collection_object is not None and "manifest_text" in self.collection_object:
379                 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
380         return False
381
382     def __getitem__(self, item):
383         self.checkupdate()
384         if item == '.arvados#collection':
385             if self.collection_object_file is None:
386                 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
387                 self.inodes.add_entry(self.collection_object_file)
388             return self.collection_object_file
389         else:
390             return super(CollectionDirectory, self).__getitem__(item)
391
392     def __contains__(self, k):
393         if k == '.arvados#collection':
394             return True
395         else:
396             return super(CollectionDirectory, self).__contains__(k)
397
398
399 class MagicDirectory(Directory):
400     '''A special directory that logically contains the set of all extant keep
401     locators.  When a file is referenced by lookup(), it is tested to see if it
402     is a valid keep locator to a manifest, and if so, loads the manifest
403     contents as a subdirectory of this directory with the locator as the
404     directory name.  Since querying a list of all extant keep locators is
405     impractical, only collections that have already been accessed are visible
406     to readdir().
407     '''
408
409     README_TEXT = '''
410 This directory provides access to Arvados collections as subdirectories listed
411 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
412 the form '1234567890abcdefghijklmnopqrstuv+123').
413
414 Note that this directory will appear empty until you attempt to access a
415 specific collection subdirectory (such as trying to 'cd' into it), at which
416 point the collection will actually be looked up on the server and the directory
417 will appear if it exists.
418 '''.lstrip()
419
420     def __init__(self, parent_inode, inodes, api, num_retries):
421         super(MagicDirectory, self).__init__(parent_inode)
422         self.inodes = inodes
423         self.api = api
424         self.num_retries = num_retries
425
426     def __setattr__(self, name, value):
427         super(MagicDirectory, self).__setattr__(name, value)
428         # When we're assigned an inode, add a README.
429         if ((name == 'inode') and (self.inode is not None) and
430               (not self._entries)):
431             self._entries['README'] = self.inodes.add_entry(
432                 StringFile(self.inode, self.README_TEXT, time.time()))
433             # If we're the root directory, add an identical by_id subdirectory.
434             if self.inode == llfuse.ROOT_INODE:
435                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
436                         self.inode, self.inodes, self.api, self.num_retries))
437
438     def __contains__(self, k):
439         if k in self._entries:
440             return True
441
442         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
443             return False
444
445         try:
446             e = self.inodes.add_entry(CollectionDirectory(
447                     self.inode, self.inodes, self.api, self.num_retries, k))
448             if e.update():
449                 self._entries[k] = e
450                 return True
451             else:
452                 return False
453         except Exception as e:
454             _logger.debug('arv-mount exception keep %s', e)
455             return False
456
457     def __getitem__(self, item):
458         if item in self:
459             return self._entries[item]
460         else:
461             raise KeyError("No collection with id " + item)
462
463
464 class RecursiveInvalidateDirectory(Directory):
465     def invalidate(self):
466         if self.inode == llfuse.ROOT_INODE:
467             llfuse.lock.acquire()
468         try:
469             super(RecursiveInvalidateDirectory, self).invalidate()
470             for a in self._entries:
471                 self._entries[a].invalidate()
472         except Exception:
473             _logger.exception()
474         finally:
475             if self.inode == llfuse.ROOT_INODE:
476                 llfuse.lock.release()
477
478
479 class TagsDirectory(RecursiveInvalidateDirectory):
480     '''A special directory that contains as subdirectories all tags visible to the user.'''
481
482     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
483         super(TagsDirectory, self).__init__(parent_inode)
484         self.inodes = inodes
485         self.api = api
486         self.num_retries = num_retries
487         self._poll = True
488         self._poll_time = poll_time
489
490     def update(self):
491         with llfuse.lock_released:
492             tags = self.api.links().list(
493                 filters=[['link_class', '=', 'tag']],
494                 select=['name'], distinct=True
495                 ).execute(num_retries=self.num_retries)
496         if "items" in tags:
497             self.merge(tags['items'],
498                        lambda i: i['name'],
499                        lambda a, i: a.tag == i['name'],
500                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
501
502
503 class TagDirectory(Directory):
504     '''A special directory that contains as subdirectories all collections visible
505     to the user that are tagged with a particular tag.
506     '''
507
508     def __init__(self, parent_inode, inodes, api, num_retries, tag,
509                  poll=False, poll_time=60):
510         super(TagDirectory, self).__init__(parent_inode)
511         self.inodes = inodes
512         self.api = api
513         self.num_retries = num_retries
514         self.tag = tag
515         self._poll = poll
516         self._poll_time = poll_time
517
518     def update(self):
519         with llfuse.lock_released:
520             taggedcollections = self.api.links().list(
521                 filters=[['link_class', '=', 'tag'],
522                          ['name', '=', self.tag],
523                          ['head_uuid', 'is_a', 'arvados#collection']],
524                 select=['head_uuid']
525                 ).execute(num_retries=self.num_retries)
526         self.merge(taggedcollections['items'],
527                    lambda i: i['head_uuid'],
528                    lambda a, i: a.collection_locator == i['head_uuid'],
529                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
530
531
532 class ProjectDirectory(Directory):
533     '''A special directory that contains the contents of a project.'''
534
535     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
536                  poll=False, poll_time=60):
537         super(ProjectDirectory, self).__init__(parent_inode)
538         self.inodes = inodes
539         self.api = api
540         self.num_retries = num_retries
541         self.project_object = project_object
542         self.project_object_file = None
543         self.uuid = project_object['uuid']
544         self._poll = poll
545         self._poll_time = poll_time
546
547     def createDirectory(self, i):
548         if collection_uuid_pattern.match(i['uuid']):
549             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
550         elif group_uuid_pattern.match(i['uuid']):
551             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
552         elif link_uuid_pattern.match(i['uuid']):
553             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
554                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
555             else:
556                 return None
557         elif uuid_pattern.match(i['uuid']):
558             return ObjectFile(self.parent_inode, i)
559         else:
560             return None
561
562     def update(self):
563         if self.project_object_file == None:
564             self.project_object_file = ObjectFile(self.inode, self.project_object)
565             self.inodes.add_entry(self.project_object_file)
566
567         def namefn(i):
568             if 'name' in i:
569                 if i['name'] is None or len(i['name']) == 0:
570                     return None
571                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
572                     # collection or subproject
573                     return i['name']
574                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
575                     # name link
576                     return i['name']
577                 elif 'kind' in i and i['kind'].startswith('arvados#'):
578                     # something else
579                     return "{}.{}".format(i['name'], i['kind'][8:])
580             else:
581                 return None
582
583         def samefn(a, i):
584             if isinstance(a, CollectionDirectory):
585                 return a.collection_locator == i['uuid']
586             elif isinstance(a, ProjectDirectory):
587                 return a.uuid == i['uuid']
588             elif isinstance(a, ObjectFile):
589                 return a.uuid == i['uuid'] and not a.stale()
590             return False
591
592         with llfuse.lock_released:
593             if group_uuid_pattern.match(self.uuid):
594                 self.project_object = self.api.groups().get(
595                     uuid=self.uuid).execute(num_retries=self.num_retries)
596             elif user_uuid_pattern.match(self.uuid):
597                 self.project_object = self.api.users().get(
598                     uuid=self.uuid).execute(num_retries=self.num_retries)
599
600             contents = arvados.util.list_all(self.api.groups().contents,
601                                              self.num_retries, uuid=self.uuid)
602             # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
603             contents += arvados.util.list_all(
604                 self.api.links().list, self.num_retries,
605                 filters=[['tail_uuid', '=', self.uuid],
606                          ['link_class', '=', 'name']])
607
608         # end with llfuse.lock_released, re-acquire lock
609
610         self.merge(contents,
611                    namefn,
612                    samefn,
613                    self.createDirectory)
614
615     def __getitem__(self, item):
616         self.checkupdate()
617         if item == '.arvados#project':
618             return self.project_object_file
619         else:
620             return super(ProjectDirectory, self).__getitem__(item)
621
622     def __contains__(self, k):
623         if k == '.arvados#project':
624             return True
625         else:
626             return super(ProjectDirectory, self).__contains__(k)
627
628
629 class SharedDirectory(Directory):
630     '''A special directory that represents users or groups who have shared projects with me.'''
631
632     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
633                  poll=False, poll_time=60):
634         super(SharedDirectory, self).__init__(parent_inode)
635         self.inodes = inodes
636         self.api = api
637         self.num_retries = num_retries
638         self.current_user = api.users().current().execute(num_retries=num_retries)
639         self._poll = True
640         self._poll_time = poll_time
641
642     def update(self):
643         with llfuse.lock_released:
644             all_projects = arvados.util.list_all(
645                 self.api.groups().list, self.num_retries,
646                 filters=[['group_class','=','project']])
647             objects = {}
648             for ob in all_projects:
649                 objects[ob['uuid']] = ob
650
651             roots = []
652             root_owners = {}
653             for ob in all_projects:
654                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
655                     roots.append(ob)
656                     root_owners[ob['owner_uuid']] = True
657
658             lusers = arvados.util.list_all(
659                 self.api.users().list, self.num_retries,
660                 filters=[['uuid','in', list(root_owners)]])
661             lgroups = arvados.util.list_all(
662                 self.api.groups().list, self.num_retries,
663                 filters=[['uuid','in', list(root_owners)]])
664
665             users = {}
666             groups = {}
667
668             for l in lusers:
669                 objects[l["uuid"]] = l
670             for l in lgroups:
671                 objects[l["uuid"]] = l
672
673             contents = {}
674             for r in root_owners:
675                 if r in objects:
676                     obr = objects[r]
677                     if "name" in obr:
678                         contents[obr["name"]] = obr
679                     if "first_name" in obr:
680                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
681
682             for r in roots:
683                 if r['owner_uuid'] not in objects:
684                     contents[r['name']] = r
685
686         # end with llfuse.lock_released, re-acquire lock
687
688         try:
689             self.merge(contents.items(),
690                        lambda i: i[0],
691                        lambda a, i: a.uuid == i[1]['uuid'],
692                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
693         except Exception:
694             _logger.exception()
695
696
697 class FileHandle(object):
698     '''Connects a numeric file handle to a File or Directory object that has
699     been opened by the client.'''
700
701     def __init__(self, fh, entry):
702         self.fh = fh
703         self.entry = entry
704
705
706 class Inodes(object):
707     '''Manage the set of inodes.  This is the mapping from a numeric id
708     to a concrete File or Directory object'''
709
710     def __init__(self):
711         self._entries = {}
712         self._counter = itertools.count(llfuse.ROOT_INODE)
713
714     def __getitem__(self, item):
715         return self._entries[item]
716
717     def __setitem__(self, key, item):
718         self._entries[key] = item
719
720     def __iter__(self):
721         return self._entries.iterkeys()
722
723     def items(self):
724         return self._entries.items()
725
726     def __contains__(self, k):
727         return k in self._entries
728
729     def add_entry(self, entry):
730         entry.inode = next(self._counter)
731         self._entries[entry.inode] = entry
732         return entry
733
734     def del_entry(self, entry):
735         llfuse.invalidate_inode(entry.inode)
736         del self._entries[entry.inode]
737
738 class Operations(llfuse.Operations):
739     '''This is the main interface with llfuse.  The methods on this object are
740     called by llfuse threads to service FUSE events to query and read from
741     the file system.
742
743     llfuse has its own global lock which is acquired before calling a request handler,
744     so request handlers do not run concurrently unless the lock is explicitly released
745     using "with llfuse.lock_released:"'''
746
747     def __init__(self, uid, gid, encoding="utf-8"):
748         super(Operations, self).__init__()
749
750         self.inodes = Inodes()
751         self.uid = uid
752         self.gid = gid
753         self.encoding = encoding
754
755         # dict of inode to filehandle
756         self._filehandles = {}
757         self._filehandles_counter = 1
758
759         # Other threads that need to wait until the fuse driver
760         # is fully initialized should wait() on this event object.
761         self.initlock = threading.Event()
762
763     def init(self):
764         # Allow threads that are waiting for the driver to be finished
765         # initializing to continue
766         self.initlock.set()
767
768     def access(self, inode, mode, ctx):
769         return True
770
771     def getattr(self, inode):
772         if inode not in self.inodes:
773             raise llfuse.FUSEError(errno.ENOENT)
774
775         e = self.inodes[inode]
776
777         entry = llfuse.EntryAttributes()
778         entry.st_ino = inode
779         entry.generation = 0
780         entry.entry_timeout = 300
781         entry.attr_timeout = 300
782
783         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
784         if isinstance(e, Directory):
785             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
786         elif isinstance(e, StreamReaderFile):
787             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
788         else:
789             entry.st_mode |= stat.S_IFREG
790
791         entry.st_nlink = 1
792         entry.st_uid = self.uid
793         entry.st_gid = self.gid
794         entry.st_rdev = 0
795
796         entry.st_size = e.size()
797
798         entry.st_blksize = 512
799         entry.st_blocks = (e.size()/512)+1
800         entry.st_atime = int(e.atime())
801         entry.st_mtime = int(e.mtime())
802         entry.st_ctime = int(e.mtime())
803
804         return entry
805
806     def lookup(self, parent_inode, name):
807         name = unicode(name, self.encoding)
808         _logger.debug("arv-mount lookup: parent_inode %i name %s",
809                       parent_inode, name)
810         inode = None
811
812         if name == '.':
813             inode = parent_inode
814         else:
815             if parent_inode in self.inodes:
816                 p = self.inodes[parent_inode]
817                 if name == '..':
818                     inode = p.parent_inode
819                 elif isinstance(p, Directory) and name in p:
820                     inode = p[name].inode
821
822         if inode != None:
823             return self.getattr(inode)
824         else:
825             raise llfuse.FUSEError(errno.ENOENT)
826
827     def open(self, inode, flags):
828         if inode in self.inodes:
829             p = self.inodes[inode]
830         else:
831             raise llfuse.FUSEError(errno.ENOENT)
832
833         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
834             raise llfuse.FUSEError(errno.EROFS)
835
836         if isinstance(p, Directory):
837             raise llfuse.FUSEError(errno.EISDIR)
838
839         fh = self._filehandles_counter
840         self._filehandles_counter += 1
841         self._filehandles[fh] = FileHandle(fh, p)
842         return fh
843
844     def read(self, fh, off, size):
845         _logger.debug("arv-mount read %i %i %i", fh, off, size)
846         if fh in self._filehandles:
847             handle = self._filehandles[fh]
848         else:
849             raise llfuse.FUSEError(errno.EBADF)
850
851         # update atime
852         handle.entry._atime = time.time()
853
854         try:
855             with llfuse.lock_released:
856                 return handle.entry.readfrom(off, size)
857         except arvados.errors.NotFoundError as e:
858             _logger.warning("Block not found: " + str(e))
859             raise llfuse.FUSEError(errno.EIO)
860         except Exception:
861             _logger.exception()
862             raise llfuse.FUSEError(errno.EIO)
863
864     def release(self, fh):
865         if fh in self._filehandles:
866             del self._filehandles[fh]
867
868     def opendir(self, inode):
869         _logger.debug("arv-mount opendir: inode %i", inode)
870
871         if inode in self.inodes:
872             p = self.inodes[inode]
873         else:
874             raise llfuse.FUSEError(errno.ENOENT)
875
876         if not isinstance(p, Directory):
877             raise llfuse.FUSEError(errno.ENOTDIR)
878
879         fh = self._filehandles_counter
880         self._filehandles_counter += 1
881         if p.parent_inode in self.inodes:
882             parent = self.inodes[p.parent_inode]
883         else:
884             raise llfuse.FUSEError(errno.EIO)
885
886         # update atime
887         p._atime = time.time()
888
889         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
890         return fh
891
892     def readdir(self, fh, off):
893         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
894
895         if fh in self._filehandles:
896             handle = self._filehandles[fh]
897         else:
898             raise llfuse.FUSEError(errno.EBADF)
899
900         _logger.debug("arv-mount handle.entry %s", handle.entry)
901
902         e = off
903         while e < len(handle.entry):
904             if handle.entry[e][1].inode in self.inodes:
905                 try:
906                     yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
907                 except UnicodeEncodeError:
908                     pass
909             e += 1
910
911     def releasedir(self, fh):
912         del self._filehandles[fh]
913
914     def statfs(self):
915         st = llfuse.StatvfsData()
916         st.f_bsize = 64 * 1024
917         st.f_blocks = 0
918         st.f_files = 0
919
920         st.f_bfree = 0
921         st.f_bavail = 0
922
923         st.f_ffree = 0
924         st.f_favail = 0
925
926         st.f_frsize = 0
927         return st
928
929     # The llfuse documentation recommends only overloading functions that
930     # are actually implemented, as the default implementation will raise ENOSYS.
931     # However, there is a bug in the llfuse default implementation of create()
932     # "create() takes exactly 5 positional arguments (6 given)" which will crash
933     # arv-mount.
934     # The workaround is to implement it with the proper number of parameters,
935     # and then everything works out.
936     def create(self, p1, p2, p3, p4, p5):
937         raise llfuse.FUSEError(errno.EROFS)