5095: Record mtime() from group#contents
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
25
26 _logger = logging.getLogger('arvados.arvados_fuse')
27
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
32
33 class SafeApi(object):
34     """Threadsafe wrapper for API object.
35
36     This stores and returns a different api object per thread, because
37     httplib2 which underlies apiclient is not threadsafe.
38     """
39
40     def __init__(self, config):
41         self.host = config.get('ARVADOS_API_HOST')
42         self.api_token = config.get('ARVADOS_API_TOKEN')
43         self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
44         self.local = threading.local()
45         self.block_cache = arvados.KeepBlockCache()
46
47     def localapi(self):
48         if 'api' not in self.local.__dict__:
49             self.local.api = arvados.api(
50                 version='v1',
51                 host=self.host, token=self.api_token, insecure=self.insecure)
52         return self.local.api
53
54     def localkeep(self):
55         if 'keep' not in self.local.__dict__:
56             self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
57         return self.local.keep
58
59     def __getattr__(self, name):
60         # Proxy nonexistent attributes to the local API client.
61         try:
62             return getattr(self.localapi(), name)
63         except AttributeError:
64             return super(SafeApi, self).__getattr__(name)
65
66
67 def convertTime(t):
68     '''Parse Arvados timestamp to unix time.'''
69     try:
70         return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
71     except (TypeError, ValueError):
72         return 0
73
74 def sanitize_filename(dirty):
75     '''Replace disallowed filename characters with harmless "_".'''
76     if dirty is None:
77         return None
78     elif dirty == '':
79         return '_'
80     elif dirty == '.':
81         return '_'
82     elif dirty == '..':
83         return '__'
84     else:
85         return _disallowed_filename_characters.sub('_', dirty)
86
87
88 class FreshBase(object):
89     '''Base class for maintaining fresh/stale state to determine when to update.'''
90     def __init__(self):
91         self._stale = True
92         self._poll = False
93         self._last_update = time.time()
94         self._atime = time.time()
95         self._poll_time = 60
96
97     # Mark the value as stale
98     def invalidate(self):
99         self._stale = True
100
101     # Test if the entries dict is stale.
102     def stale(self):
103         if self._stale:
104             return True
105         if self._poll:
106             return (self._last_update + self._poll_time) < self._atime
107         return False
108
109     def fresh(self):
110         self._stale = False
111         self._last_update = time.time()
112
113     def atime(self):
114         return self._atime
115
116 class File(FreshBase):
117     '''Base for file objects.'''
118
119     def __init__(self, parent_inode, _mtime=0):
120         super(File, self).__init__()
121         self.inode = None
122         self.parent_inode = parent_inode
123         self._mtime = _mtime
124
125     def size(self):
126         return 0
127
128     def readfrom(self, off, size):
129         return ''
130
131     def mtime(self):
132         return self._mtime
133
134
135 class StreamReaderFile(File):
136     '''Wraps a StreamFileReader as a file.'''
137
138     def __init__(self, parent_inode, reader, _mtime):
139         super(StreamReaderFile, self).__init__(parent_inode, _mtime)
140         self.reader = reader
141
142     def size(self):
143         return self.reader.size()
144
145     def readfrom(self, off, size):
146         return self.reader.readfrom(off, size)
147
148     def stale(self):
149         return False
150
151
152 class StringFile(File):
153     '''Wrap a simple string as a file'''
154     def __init__(self, parent_inode, contents, _mtime):
155         super(StringFile, self).__init__(parent_inode, _mtime)
156         self.contents = contents
157
158     def size(self):
159         return len(self.contents)
160
161     def readfrom(self, off, size):
162         return self.contents[off:(off+size)]
163
164
165 class ObjectFile(StringFile):
166     '''Wrap a dict as a serialized json object.'''
167
168     def __init__(self, parent_inode, obj):
169         super(ObjectFile, self).__init__(parent_inode, "", 0)
170         self.uuid = obj['uuid']
171         self.update(obj)
172
173     def update(self, obj):
174         self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
175         self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
176
177
178 class Directory(FreshBase):
179     '''Generic directory object, backed by a dict.
180     Consists of a set of entries with the key representing the filename
181     and the value referencing a File or Directory object.
182     '''
183
184     def __init__(self, parent_inode):
185         super(Directory, self).__init__()
186
187         '''parent_inode is the integer inode number'''
188         self.inode = None
189         if not isinstance(parent_inode, int):
190             raise Exception("parent_inode should be an int")
191         self.parent_inode = parent_inode
192         self._entries = {}
193         self._mtime = time.time()
194
195     #  Overriden by subclasses to implement logic to update the entries dict
196     #  when the directory is stale
197     def update(self):
198         pass
199
200     # Only used when computing the size of the disk footprint of the directory
201     # (stub)
202     def size(self):
203         return 0
204
205     def checkupdate(self):
206         if self.stale():
207             try:
208                 self.update()
209             except apiclient.errors.HttpError as e:
210                 _logger.debug(e)
211
212     def __getitem__(self, item):
213         self.checkupdate()
214         return self._entries[item]
215
216     def items(self):
217         self.checkupdate()
218         return self._entries.items()
219
220     def __iter__(self):
221         self.checkupdate()
222         return self._entries.iterkeys()
223
224     def __contains__(self, k):
225         self.checkupdate()
226         return k in self._entries
227
228     def merge(self, items, fn, same, new_entry):
229         '''Helper method for updating the contents of the directory.  Takes a list
230         describing the new contents of the directory, reuse entries that are
231         the same in both the old and new lists, create new entries, and delete
232         old entries missing from the new list.
233
234         items: iterable with new directory contents
235
236         fn: function to take an entry in 'items' and return the desired file or
237         directory name, or None if this entry should be skipped
238
239         same: function to compare an existing entry (a File or Directory
240         object) with an entry in the items list to determine whether to keep
241         the existing entry.
242
243         new_entry: function to create a new directory entry (File or Directory
244         object) from an entry in the items list.
245
246         '''
247
248         oldentries = self._entries
249         self._entries = {}
250         changed = False
251         for i in items:
252             name = sanitize_filename(fn(i))
253             if name:
254                 if name in oldentries and same(oldentries[name], i):
255                     # move existing directory entry over
256                     self._entries[name] = oldentries[name]
257                     del oldentries[name]
258                 else:
259                     # create new directory entry
260                     ent = new_entry(i)
261                     if ent is not None:
262                         self._entries[name] = self.inodes.add_entry(ent)
263                         changed = True
264
265         # delete any other directory entries that were not in found in 'items'
266         for i in oldentries:
267             llfuse.invalidate_entry(self.inode, str(i))
268             self.inodes.del_entry(oldentries[i])
269             changed = True
270
271         if changed:
272             self._mtime = time.time()
273
274         self.fresh()
275
276     def clear(self):
277         '''Delete all entries'''
278         oldentries = self._entries
279         self._entries = {}
280         for n in oldentries:
281             if isinstance(n, Directory):
282                 n.clear()
283             llfuse.invalidate_entry(self.inode, str(n))
284             self.inodes.del_entry(oldentries[n])
285         llfuse.invalidate_inode(self.inode)
286         self.invalidate()
287
288     def mtime(self):
289         return self._mtime
290
291
292 class CollectionDirectory(Directory):
293     '''Represents the root of a directory tree holding a collection.'''
294
295     def __init__(self, parent_inode, inodes, api, num_retries, collection):
296         super(CollectionDirectory, self).__init__(parent_inode)
297         self.inodes = inodes
298         self.api = api
299         self.num_retries = num_retries
300         self.collection_object_file = None
301         self.collection_object = None
302         if isinstance(collection, dict):
303             self.collection_locator = collection['uuid']
304             self._mtime = convertTime(collection['modified_at']) if 'modified_at' in collection else 0
305         else:
306             self.collection_locator = collection
307
308     def same(self, i):
309         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
310
311     # Used by arv-web.py to switch the contents of the CollectionDirectory
312     def change_collection(self, new_locator):
313         """Switch the contents of the CollectionDirectory.  Must be called with llfuse.lock held."""
314         self.collection_locator = new_locator
315         self.collection_object = None
316         self.update()
317
318     def new_collection(self, new_collection_object, coll_reader):
319         self.collection_object = new_collection_object
320
321         if self.collection_object_file is not None:
322             self.collection_object_file.update(self.collection_object)
323
324         self.clear()
325         for s in coll_reader.all_streams():
326             cwd = self
327             for part in s.name().split('/'):
328                 if part != '' and part != '.':
329                     partname = sanitize_filename(part)
330                     if partname not in cwd._entries:
331                         cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
332                     cwd = cwd._entries[partname]
333             for k, v in s.files().items():
334                 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
335
336     def update(self):
337         try:
338             if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
339                 return True
340
341             if self.collection_locator is None:
342                 self.fresh()
343                 return True
344
345             with llfuse.lock_released:
346                 coll_reader = arvados.CollectionReader(
347                     self.collection_locator, self.api, self.api.localkeep(),
348                     num_retries=self.num_retries)
349                 new_collection_object = coll_reader.api_response() or {}
350                 # If the Collection only exists in Keep, there will be no API
351                 # response.  Fill in the fields we need.
352                 if 'uuid' not in new_collection_object:
353                     new_collection_object['uuid'] = self.collection_locator
354                 if "portable_data_hash" not in new_collection_object:
355                     new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
356                 if 'manifest_text' not in new_collection_object:
357                     new_collection_object['manifest_text'] = coll_reader.manifest_text()
358                 coll_reader.normalize()
359             # end with llfuse.lock_released, re-acquire lock
360
361             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
362                 self.new_collection(new_collection_object, coll_reader)
363
364             self.fresh()
365             return True
366         except arvados.errors.NotFoundError:
367             _logger.exception("arv-mount %s: error", self.collection_locator)
368         except arvados.errors.ArgumentError as detail:
369             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
370             if self.collection_object is not None and "manifest_text" in self.collection_object:
371                 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
372         except Exception:
373             _logger.exception("arv-mount %s: error", self.collection_locator)
374             if self.collection_object is not None and "manifest_text" in self.collection_object:
375                 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
376         return False
377
378     def __getitem__(self, item):
379         self.checkupdate()
380         if item == '.arvados#collection':
381             if self.collection_object_file is None:
382                 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
383                 self.inodes.add_entry(self.collection_object_file)
384             return self.collection_object_file
385         else:
386             return super(CollectionDirectory, self).__getitem__(item)
387
388     def __contains__(self, k):
389         if k == '.arvados#collection':
390             return True
391         else:
392             return super(CollectionDirectory, self).__contains__(k)
393
394     def mtime(self):
395         return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else self._mtime
396
397
398 class MagicDirectory(Directory):
399     '''A special directory that logically contains the set of all extant keep
400     locators.  When a file is referenced by lookup(), it is tested to see if it
401     is a valid keep locator to a manifest, and if so, loads the manifest
402     contents as a subdirectory of this directory with the locator as the
403     directory name.  Since querying a list of all extant keep locators is
404     impractical, only collections that have already been accessed are visible
405     to readdir().
406     '''
407
408     README_TEXT = '''
409 This directory provides access to Arvados collections as subdirectories listed
410 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
411 the form '1234567890abcdefghijklmnopqrstuv+123').
412
413 Note that this directory will appear empty until you attempt to access a
414 specific collection subdirectory (such as trying to 'cd' into it), at which
415 point the collection will actually be looked up on the server and the directory
416 will appear if it exists.
417 '''.lstrip()
418
419     def __init__(self, parent_inode, inodes, api, num_retries):
420         super(MagicDirectory, self).__init__(parent_inode)
421         self.inodes = inodes
422         self.api = api
423         self.num_retries = num_retries
424
425     def __setattr__(self, name, value):
426         super(MagicDirectory, self).__setattr__(name, value)
427         # When we're assigned an inode, add a README.
428         if ((name == 'inode') and (self.inode is not None) and
429               (not self._entries)):
430             self._entries['README'] = self.inodes.add_entry(
431                 StringFile(self.inode, self.README_TEXT, time.time()))
432             # If we're the root directory, add an identical by_id subdirectory.
433             if self.inode == llfuse.ROOT_INODE:
434                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
435                         self.inode, self.inodes, self.api, self.num_retries))
436
437     def __contains__(self, k):
438         if k in self._entries:
439             return True
440
441         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
442             return False
443
444         try:
445             e = self.inodes.add_entry(CollectionDirectory(
446                     self.inode, self.inodes, self.api, self.num_retries, k))
447             if e.update():
448                 self._entries[k] = e
449                 return True
450             else:
451                 return False
452         except Exception as e:
453             _logger.debug('arv-mount exception keep %s', e)
454             return False
455
456     def __getitem__(self, item):
457         if item in self:
458             return self._entries[item]
459         else:
460             raise KeyError("No collection with id " + item)
461
462
463 class RecursiveInvalidateDirectory(Directory):
464     def invalidate(self):
465         if self.inode == llfuse.ROOT_INODE:
466             llfuse.lock.acquire()
467         try:
468             super(RecursiveInvalidateDirectory, self).invalidate()
469             for a in self._entries:
470                 self._entries[a].invalidate()
471         except Exception:
472             _logger.exception()
473         finally:
474             if self.inode == llfuse.ROOT_INODE:
475                 llfuse.lock.release()
476
477
478 class TagsDirectory(RecursiveInvalidateDirectory):
479     '''A special directory that contains as subdirectories all tags visible to the user.'''
480
481     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
482         super(TagsDirectory, self).__init__(parent_inode)
483         self.inodes = inodes
484         self.api = api
485         self.num_retries = num_retries
486         self._poll = True
487         self._poll_time = poll_time
488
489     def update(self):
490         with llfuse.lock_released:
491             tags = self.api.links().list(
492                 filters=[['link_class', '=', 'tag']],
493                 select=['name'], distinct=True
494                 ).execute(num_retries=self.num_retries)
495         if "items" in tags:
496             self.merge(tags['items'],
497                        lambda i: i['name'],
498                        lambda a, i: a.tag == i['name'],
499                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
500
501
502 class TagDirectory(Directory):
503     '''A special directory that contains as subdirectories all collections visible
504     to the user that are tagged with a particular tag.
505     '''
506
507     def __init__(self, parent_inode, inodes, api, num_retries, tag,
508                  poll=False, poll_time=60):
509         super(TagDirectory, self).__init__(parent_inode)
510         self.inodes = inodes
511         self.api = api
512         self.num_retries = num_retries
513         self.tag = tag
514         self._poll = poll
515         self._poll_time = poll_time
516
517     def update(self):
518         with llfuse.lock_released:
519             taggedcollections = self.api.links().list(
520                 filters=[['link_class', '=', 'tag'],
521                          ['name', '=', self.tag],
522                          ['head_uuid', 'is_a', 'arvados#collection']],
523                 select=['head_uuid']
524                 ).execute(num_retries=self.num_retries)
525         self.merge(taggedcollections['items'],
526                    lambda i: i['head_uuid'],
527                    lambda a, i: a.collection_locator == i['head_uuid'],
528                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
529
530
531 class ProjectDirectory(Directory):
532     '''A special directory that contains the contents of a project.'''
533
534     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
535                  poll=False, poll_time=60):
536         super(ProjectDirectory, self).__init__(parent_inode)
537         self.inodes = inodes
538         self.api = api
539         self.num_retries = num_retries
540         self.project_object = project_object
541         self.project_object_file = None
542         self.uuid = project_object['uuid']
543         self._poll = poll
544         self._poll_time = poll_time
545
546     def createDirectory(self, i):
547         if collection_uuid_pattern.match(i['uuid']):
548             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
549         elif group_uuid_pattern.match(i['uuid']):
550             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
551         elif link_uuid_pattern.match(i['uuid']):
552             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
553                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
554             else:
555                 return None
556         elif uuid_pattern.match(i['uuid']):
557             return ObjectFile(self.parent_inode, i)
558         else:
559             return None
560
561     def update(self):
562         if self.project_object_file == None:
563             self.project_object_file = ObjectFile(self.inode, self.project_object)
564             self.inodes.add_entry(self.project_object_file)
565
566         def namefn(i):
567             if 'name' in i:
568                 if i['name'] is None or len(i['name']) == 0:
569                     return None
570                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
571                     # collection or subproject
572                     return i['name']
573                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
574                     # name link
575                     return i['name']
576                 elif 'kind' in i and i['kind'].startswith('arvados#'):
577                     # something else
578                     return "{}.{}".format(i['name'], i['kind'][8:])
579             else:
580                 return None
581
582         def samefn(a, i):
583             if isinstance(a, CollectionDirectory):
584                 return a.collection_locator == i['uuid']
585             elif isinstance(a, ProjectDirectory):
586                 return a.uuid == i['uuid']
587             elif isinstance(a, ObjectFile):
588                 return a.uuid == i['uuid'] and not a.stale()
589             return False
590
591         with llfuse.lock_released:
592             if group_uuid_pattern.match(self.uuid):
593                 self.project_object = self.api.groups().get(
594                     uuid=self.uuid).execute(num_retries=self.num_retries)
595             elif user_uuid_pattern.match(self.uuid):
596                 self.project_object = self.api.users().get(
597                     uuid=self.uuid).execute(num_retries=self.num_retries)
598
599             contents = arvados.util.list_all(self.api.groups().contents,
600                                              self.num_retries, uuid=self.uuid)
601             # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
602             contents += arvados.util.list_all(
603                 self.api.links().list, self.num_retries,
604                 filters=[['tail_uuid', '=', self.uuid],
605                          ['link_class', '=', 'name']])
606
607         # end with llfuse.lock_released, re-acquire lock
608
609         self.merge(contents,
610                    namefn,
611                    samefn,
612                    self.createDirectory)
613
614     def __getitem__(self, item):
615         self.checkupdate()
616         if item == '.arvados#project':
617             return self.project_object_file
618         else:
619             return super(ProjectDirectory, self).__getitem__(item)
620
621     def __contains__(self, k):
622         if k == '.arvados#project':
623             return True
624         else:
625             return super(ProjectDirectory, self).__contains__(k)
626
627
628 class SharedDirectory(Directory):
629     '''A special directory that represents users or groups who have shared projects with me.'''
630
631     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
632                  poll=False, poll_time=60):
633         super(SharedDirectory, self).__init__(parent_inode)
634         self.inodes = inodes
635         self.api = api
636         self.num_retries = num_retries
637         self.current_user = api.users().current().execute(num_retries=num_retries)
638         self._poll = True
639         self._poll_time = poll_time
640
641     def update(self):
642         with llfuse.lock_released:
643             all_projects = arvados.util.list_all(
644                 self.api.groups().list, self.num_retries,
645                 filters=[['group_class','=','project']])
646             objects = {}
647             for ob in all_projects:
648                 objects[ob['uuid']] = ob
649
650             roots = []
651             root_owners = {}
652             for ob in all_projects:
653                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
654                     roots.append(ob)
655                     root_owners[ob['owner_uuid']] = True
656
657             lusers = arvados.util.list_all(
658                 self.api.users().list, self.num_retries,
659                 filters=[['uuid','in', list(root_owners)]])
660             lgroups = arvados.util.list_all(
661                 self.api.groups().list, self.num_retries,
662                 filters=[['uuid','in', list(root_owners)]])
663
664             users = {}
665             groups = {}
666
667             for l in lusers:
668                 objects[l["uuid"]] = l
669             for l in lgroups:
670                 objects[l["uuid"]] = l
671
672             contents = {}
673             for r in root_owners:
674                 if r in objects:
675                     obr = objects[r]
676                     if "name" in obr:
677                         contents[obr["name"]] = obr
678                     if "first_name" in obr:
679                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
680
681             for r in roots:
682                 if r['owner_uuid'] not in objects:
683                     contents[r['name']] = r
684
685         # end with llfuse.lock_released, re-acquire lock
686
687         try:
688             self.merge(contents.items(),
689                        lambda i: i[0],
690                        lambda a, i: a.uuid == i[1]['uuid'],
691                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
692         except Exception:
693             _logger.exception()
694
695
696 class FileHandle(object):
697     '''Connects a numeric file handle to a File or Directory object that has
698     been opened by the client.'''
699
700     def __init__(self, fh, entry):
701         self.fh = fh
702         self.entry = entry
703
704
705 class Inodes(object):
706     '''Manage the set of inodes.  This is the mapping from a numeric id
707     to a concrete File or Directory object'''
708
709     def __init__(self):
710         self._entries = {}
711         self._counter = itertools.count(llfuse.ROOT_INODE)
712
713     def __getitem__(self, item):
714         return self._entries[item]
715
716     def __setitem__(self, key, item):
717         self._entries[key] = item
718
719     def __iter__(self):
720         return self._entries.iterkeys()
721
722     def items(self):
723         return self._entries.items()
724
725     def __contains__(self, k):
726         return k in self._entries
727
728     def add_entry(self, entry):
729         entry.inode = next(self._counter)
730         self._entries[entry.inode] = entry
731         return entry
732
733     def del_entry(self, entry):
734         llfuse.invalidate_inode(entry.inode)
735         del self._entries[entry.inode]
736
737 class Operations(llfuse.Operations):
738     '''This is the main interface with llfuse.  The methods on this object are
739     called by llfuse threads to service FUSE events to query and read from
740     the file system.
741
742     llfuse has its own global lock which is acquired before calling a request handler,
743     so request handlers do not run concurrently unless the lock is explicitly released
744     using "with llfuse.lock_released:"'''
745
746     def __init__(self, uid, gid, encoding="utf-8"):
747         super(Operations, self).__init__()
748
749         self.inodes = Inodes()
750         self.uid = uid
751         self.gid = gid
752         self.encoding = encoding
753
754         # dict of inode to filehandle
755         self._filehandles = {}
756         self._filehandles_counter = 1
757
758         # Other threads that need to wait until the fuse driver
759         # is fully initialized should wait() on this event object.
760         self.initlock = threading.Event()
761
762     def init(self):
763         # Allow threads that are waiting for the driver to be finished
764         # initializing to continue
765         self.initlock.set()
766
767     def access(self, inode, mode, ctx):
768         return True
769
770     def getattr(self, inode):
771         if inode not in self.inodes:
772             raise llfuse.FUSEError(errno.ENOENT)
773
774         e = self.inodes[inode]
775
776         entry = llfuse.EntryAttributes()
777         entry.st_ino = inode
778         entry.generation = 0
779         entry.entry_timeout = 300
780         entry.attr_timeout = 300
781
782         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
783         if isinstance(e, Directory):
784             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
785         elif isinstance(e, StreamReaderFile):
786             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
787         else:
788             entry.st_mode |= stat.S_IFREG
789
790         entry.st_nlink = 1
791         entry.st_uid = self.uid
792         entry.st_gid = self.gid
793         entry.st_rdev = 0
794
795         entry.st_size = e.size()
796
797         entry.st_blksize = 512
798         entry.st_blocks = (e.size()/512)+1
799         entry.st_atime = int(e.atime())
800         entry.st_mtime = int(e.mtime())
801         entry.st_ctime = int(e.mtime())
802
803         return entry
804
805     def lookup(self, parent_inode, name):
806         name = unicode(name, self.encoding)
807         _logger.debug("arv-mount lookup: parent_inode %i name %s",
808                       parent_inode, name)
809         inode = None
810
811         if name == '.':
812             inode = parent_inode
813         else:
814             if parent_inode in self.inodes:
815                 p = self.inodes[parent_inode]
816                 if name == '..':
817                     inode = p.parent_inode
818                 elif isinstance(p, Directory) and name in p:
819                     inode = p[name].inode
820
821         if inode != None:
822             return self.getattr(inode)
823         else:
824             raise llfuse.FUSEError(errno.ENOENT)
825
826     def open(self, inode, flags):
827         if inode in self.inodes:
828             p = self.inodes[inode]
829         else:
830             raise llfuse.FUSEError(errno.ENOENT)
831
832         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
833             raise llfuse.FUSEError(errno.EROFS)
834
835         if isinstance(p, Directory):
836             raise llfuse.FUSEError(errno.EISDIR)
837
838         fh = self._filehandles_counter
839         self._filehandles_counter += 1
840         self._filehandles[fh] = FileHandle(fh, p)
841         return fh
842
843     def read(self, fh, off, size):
844         _logger.debug("arv-mount read %i %i %i", fh, off, size)
845         if fh in self._filehandles:
846             handle = self._filehandles[fh]
847         else:
848             raise llfuse.FUSEError(errno.EBADF)
849
850         # update atime
851         handle.entry._atime = time.time()
852
853         try:
854             with llfuse.lock_released:
855                 return handle.entry.readfrom(off, size)
856         except arvados.errors.NotFoundError as e:
857             _logger.warning("Block not found: " + str(e))
858             raise llfuse.FUSEError(errno.EIO)
859         except Exception:
860             _logger.exception()
861             raise llfuse.FUSEError(errno.EIO)
862
863     def release(self, fh):
864         if fh in self._filehandles:
865             del self._filehandles[fh]
866
867     def opendir(self, inode):
868         _logger.debug("arv-mount opendir: inode %i", inode)
869
870         if inode in self.inodes:
871             p = self.inodes[inode]
872         else:
873             raise llfuse.FUSEError(errno.ENOENT)
874
875         if not isinstance(p, Directory):
876             raise llfuse.FUSEError(errno.ENOTDIR)
877
878         fh = self._filehandles_counter
879         self._filehandles_counter += 1
880         if p.parent_inode in self.inodes:
881             parent = self.inodes[p.parent_inode]
882         else:
883             raise llfuse.FUSEError(errno.EIO)
884
885         # update atime
886         p._atime = time.time()
887
888         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
889         return fh
890
891     def readdir(self, fh, off):
892         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
893
894         if fh in self._filehandles:
895             handle = self._filehandles[fh]
896         else:
897             raise llfuse.FUSEError(errno.EBADF)
898
899         _logger.debug("arv-mount handle.entry %s", handle.entry)
900
901         e = off
902         while e < len(handle.entry):
903             if handle.entry[e][1].inode in self.inodes:
904                 try:
905                     yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
906                 except UnicodeEncodeError:
907                     pass
908             e += 1
909
910     def releasedir(self, fh):
911         del self._filehandles[fh]
912
913     def statfs(self):
914         st = llfuse.StatvfsData()
915         st.f_bsize = 64 * 1024
916         st.f_blocks = 0
917         st.f_files = 0
918
919         st.f_bfree = 0
920         st.f_bavail = 0
921
922         st.f_ffree = 0
923         st.f_favail = 0
924
925         st.f_frsize = 0
926         return st
927
928     # The llfuse documentation recommends only overloading functions that
929     # are actually implemented, as the default implementation will raise ENOSYS.
930     # However, there is a bug in the llfuse default implementation of create()
931     # "create() takes exactly 5 positional arguments (6 given)" which will crash
932     # arv-mount.
933     # The workaround is to implement it with the proper number of parameters,
934     # and then everything works out.
935     def create(self, p1, p2, p3, p4, p5):
936         raise llfuse.FUSEError(errno.EROFS)