Merge branch '3785-job-log-collection-owner' closes #3785
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23 import ciso8601
24
25 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
26
27 _logger = logging.getLogger('arvados.arvados_fuse')
28
29 # Match any character which FUSE or Linux cannot accommodate as part
30 # of a filename. (If present in a collection filename, they will
31 # appear as underscores in the fuse mount.)
32 _disallowed_filename_characters = re.compile('[\x00/]')
33
34 class SafeApi(object):
35     """Threadsafe wrapper for API object.
36
37     This stores and returns a different api object per thread, because
38     httplib2 which underlies apiclient is not threadsafe.
39     """
40
41     def __init__(self, config):
42         self.host = config.get('ARVADOS_API_HOST')
43         self.api_token = config.get('ARVADOS_API_TOKEN')
44         self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
45         self.local = threading.local()
46         self.block_cache = arvados.KeepBlockCache()
47
48     def localapi(self):
49         if 'api' not in self.local.__dict__:
50             self.local.api = arvados.api(
51                 version='v1',
52                 host=self.host, token=self.api_token, insecure=self.insecure)
53         return self.local.api
54
55     def localkeep(self):
56         if 'keep' not in self.local.__dict__:
57             self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
58         return self.local.keep
59
60     def __getattr__(self, name):
61         # Proxy nonexistent attributes to the local API client.
62         try:
63             return getattr(self.localapi(), name)
64         except AttributeError:
65             return super(SafeApi, self).__getattr__(name)
66
67
68 def convertTime(t):
69     """Parse Arvados timestamp to unix time."""
70     if not t:
71         return 0
72     try:
73         return calendar.timegm(ciso8601.parse_datetime_unaware(t).timetuple())
74     except (TypeError, ValueError):
75         return 0
76
77 def sanitize_filename(dirty):
78     '''Replace disallowed filename characters with harmless "_".'''
79     if dirty is None:
80         return None
81     elif dirty == '':
82         return '_'
83     elif dirty == '.':
84         return '_'
85     elif dirty == '..':
86         return '__'
87     else:
88         return _disallowed_filename_characters.sub('_', dirty)
89
90
91 class FreshBase(object):
92     '''Base class for maintaining fresh/stale state to determine when to update.'''
93     def __init__(self):
94         self._stale = True
95         self._poll = False
96         self._last_update = time.time()
97         self._atime = time.time()
98         self._poll_time = 60
99
100     # Mark the value as stale
101     def invalidate(self):
102         self._stale = True
103
104     # Test if the entries dict is stale.
105     def stale(self):
106         if self._stale:
107             return True
108         if self._poll:
109             return (self._last_update + self._poll_time) < self._atime
110         return False
111
112     def fresh(self):
113         self._stale = False
114         self._last_update = time.time()
115
116     def atime(self):
117         return self._atime
118
119 class File(FreshBase):
120     '''Base for file objects.'''
121
122     def __init__(self, parent_inode, _mtime=0):
123         super(File, self).__init__()
124         self.inode = None
125         self.parent_inode = parent_inode
126         self._mtime = _mtime
127
128     def size(self):
129         return 0
130
131     def readfrom(self, off, size):
132         return ''
133
134     def mtime(self):
135         return self._mtime
136
137
138 class StreamReaderFile(File):
139     '''Wraps a StreamFileReader as a file.'''
140
141     def __init__(self, parent_inode, reader, _mtime):
142         super(StreamReaderFile, self).__init__(parent_inode, _mtime)
143         self.reader = reader
144
145     def size(self):
146         return self.reader.size()
147
148     def readfrom(self, off, size):
149         return self.reader.readfrom(off, size)
150
151     def stale(self):
152         return False
153
154
155 class StringFile(File):
156     '''Wrap a simple string as a file'''
157     def __init__(self, parent_inode, contents, _mtime):
158         super(StringFile, self).__init__(parent_inode, _mtime)
159         self.contents = contents
160
161     def size(self):
162         return len(self.contents)
163
164     def readfrom(self, off, size):
165         return self.contents[off:(off+size)]
166
167
168 class ObjectFile(StringFile):
169     '''Wrap a dict as a serialized json object.'''
170
171     def __init__(self, parent_inode, obj):
172         super(ObjectFile, self).__init__(parent_inode, "", 0)
173         self.uuid = obj['uuid']
174         self.update(obj)
175
176     def update(self, obj):
177         self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
178         self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
179
180
181 class Directory(FreshBase):
182     '''Generic directory object, backed by a dict.
183     Consists of a set of entries with the key representing the filename
184     and the value referencing a File or Directory object.
185     '''
186
187     def __init__(self, parent_inode):
188         super(Directory, self).__init__()
189
190         '''parent_inode is the integer inode number'''
191         self.inode = None
192         if not isinstance(parent_inode, int):
193             raise Exception("parent_inode should be an int")
194         self.parent_inode = parent_inode
195         self._entries = {}
196         self._mtime = time.time()
197
198     #  Overriden by subclasses to implement logic to update the entries dict
199     #  when the directory is stale
200     def update(self):
201         pass
202
203     # Only used when computing the size of the disk footprint of the directory
204     # (stub)
205     def size(self):
206         return 0
207
208     def checkupdate(self):
209         if self.stale():
210             try:
211                 self.update()
212             except apiclient.errors.HttpError as e:
213                 _logger.debug(e)
214
215     def __getitem__(self, item):
216         self.checkupdate()
217         return self._entries[item]
218
219     def items(self):
220         self.checkupdate()
221         return self._entries.items()
222
223     def __iter__(self):
224         self.checkupdate()
225         return self._entries.iterkeys()
226
227     def __contains__(self, k):
228         self.checkupdate()
229         return k in self._entries
230
231     def merge(self, items, fn, same, new_entry):
232         '''Helper method for updating the contents of the directory.  Takes a list
233         describing the new contents of the directory, reuse entries that are
234         the same in both the old and new lists, create new entries, and delete
235         old entries missing from the new list.
236
237         items: iterable with new directory contents
238
239         fn: function to take an entry in 'items' and return the desired file or
240         directory name, or None if this entry should be skipped
241
242         same: function to compare an existing entry (a File or Directory
243         object) with an entry in the items list to determine whether to keep
244         the existing entry.
245
246         new_entry: function to create a new directory entry (File or Directory
247         object) from an entry in the items list.
248
249         '''
250
251         oldentries = self._entries
252         self._entries = {}
253         changed = False
254         for i in items:
255             name = sanitize_filename(fn(i))
256             if name:
257                 if name in oldentries and same(oldentries[name], i):
258                     # move existing directory entry over
259                     self._entries[name] = oldentries[name]
260                     del oldentries[name]
261                 else:
262                     # create new directory entry
263                     ent = new_entry(i)
264                     if ent is not None:
265                         self._entries[name] = self.inodes.add_entry(ent)
266                         changed = True
267
268         # delete any other directory entries that were not in found in 'items'
269         for i in oldentries:
270             llfuse.invalidate_entry(self.inode, str(i))
271             self.inodes.del_entry(oldentries[i])
272             changed = True
273
274         if changed:
275             self._mtime = time.time()
276
277         self.fresh()
278
279     def clear(self):
280         '''Delete all entries'''
281         oldentries = self._entries
282         self._entries = {}
283         for n in oldentries:
284             if isinstance(n, Directory):
285                 n.clear()
286             llfuse.invalidate_entry(self.inode, str(n))
287             self.inodes.del_entry(oldentries[n])
288         llfuse.invalidate_inode(self.inode)
289         self.invalidate()
290
291     def mtime(self):
292         return self._mtime
293
294
295 class CollectionDirectory(Directory):
296     '''Represents the root of a directory tree holding a collection.'''
297
298     def __init__(self, parent_inode, inodes, api, num_retries, collection):
299         super(CollectionDirectory, self).__init__(parent_inode)
300         self.inodes = inodes
301         self.api = api
302         self.num_retries = num_retries
303         self.collection_object_file = None
304         self.collection_object = None
305         if isinstance(collection, dict):
306             self.collection_locator = collection['uuid']
307             self._mtime = convertTime(collection.get('modified_at'))
308         else:
309             self.collection_locator = collection
310             self._mtime = 0
311
312     def same(self, i):
313         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
314
315     # Used by arv-web.py to switch the contents of the CollectionDirectory
316     def change_collection(self, new_locator):
317         """Switch the contents of the CollectionDirectory.  Must be called with llfuse.lock held."""
318         self.collection_locator = new_locator
319         self.collection_object = None
320         self.update()
321
322     def new_collection(self, new_collection_object, coll_reader):
323         self.collection_object = new_collection_object
324
325         self._mtime = convertTime(self.collection_object.get('modified_at'))
326
327         if self.collection_object_file is not None:
328             self.collection_object_file.update(self.collection_object)
329
330         self.clear()
331         for s in coll_reader.all_streams():
332             cwd = self
333             for part in s.name().split('/'):
334                 if part != '' and part != '.':
335                     partname = sanitize_filename(part)
336                     if partname not in cwd._entries:
337                         cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
338                     cwd = cwd._entries[partname]
339             for k, v in s.files().items():
340                 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
341
342     def update(self):
343         try:
344             if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
345                 return True
346
347             if self.collection_locator is None:
348                 self.fresh()
349                 return True
350
351             with llfuse.lock_released:
352                 coll_reader = arvados.CollectionReader(
353                     self.collection_locator, self.api, self.api.localkeep(),
354                     num_retries=self.num_retries)
355                 new_collection_object = coll_reader.api_response() or {}
356                 # If the Collection only exists in Keep, there will be no API
357                 # response.  Fill in the fields we need.
358                 if 'uuid' not in new_collection_object:
359                     new_collection_object['uuid'] = self.collection_locator
360                 if "portable_data_hash" not in new_collection_object:
361                     new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
362                 if 'manifest_text' not in new_collection_object:
363                     new_collection_object['manifest_text'] = coll_reader.manifest_text()
364                 coll_reader.normalize()
365             # end with llfuse.lock_released, re-acquire lock
366
367             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
368                 self.new_collection(new_collection_object, coll_reader)
369
370             self.fresh()
371             return True
372         except arvados.errors.NotFoundError:
373             _logger.exception("arv-mount %s: error", self.collection_locator)
374         except arvados.errors.ArgumentError as detail:
375             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
376             if self.collection_object is not None and "manifest_text" in self.collection_object:
377                 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
378         except Exception:
379             _logger.exception("arv-mount %s: error", self.collection_locator)
380             if self.collection_object is not None and "manifest_text" in self.collection_object:
381                 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
382         return False
383
384     def __getitem__(self, item):
385         self.checkupdate()
386         if item == '.arvados#collection':
387             if self.collection_object_file is None:
388                 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
389                 self.inodes.add_entry(self.collection_object_file)
390             return self.collection_object_file
391         else:
392             return super(CollectionDirectory, self).__getitem__(item)
393
394     def __contains__(self, k):
395         if k == '.arvados#collection':
396             return True
397         else:
398             return super(CollectionDirectory, self).__contains__(k)
399
400
401 class MagicDirectory(Directory):
402     '''A special directory that logically contains the set of all extant keep
403     locators.  When a file is referenced by lookup(), it is tested to see if it
404     is a valid keep locator to a manifest, and if so, loads the manifest
405     contents as a subdirectory of this directory with the locator as the
406     directory name.  Since querying a list of all extant keep locators is
407     impractical, only collections that have already been accessed are visible
408     to readdir().
409     '''
410
411     README_TEXT = '''
412 This directory provides access to Arvados collections as subdirectories listed
413 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
414 the form '1234567890abcdefghijklmnopqrstuv+123').
415
416 Note that this directory will appear empty until you attempt to access a
417 specific collection subdirectory (such as trying to 'cd' into it), at which
418 point the collection will actually be looked up on the server and the directory
419 will appear if it exists.
420 '''.lstrip()
421
422     def __init__(self, parent_inode, inodes, api, num_retries):
423         super(MagicDirectory, self).__init__(parent_inode)
424         self.inodes = inodes
425         self.api = api
426         self.num_retries = num_retries
427
428     def __setattr__(self, name, value):
429         super(MagicDirectory, self).__setattr__(name, value)
430         # When we're assigned an inode, add a README.
431         if ((name == 'inode') and (self.inode is not None) and
432               (not self._entries)):
433             self._entries['README'] = self.inodes.add_entry(
434                 StringFile(self.inode, self.README_TEXT, time.time()))
435             # If we're the root directory, add an identical by_id subdirectory.
436             if self.inode == llfuse.ROOT_INODE:
437                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
438                         self.inode, self.inodes, self.api, self.num_retries))
439
440     def __contains__(self, k):
441         if k in self._entries:
442             return True
443
444         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
445             return False
446
447         try:
448             e = self.inodes.add_entry(CollectionDirectory(
449                     self.inode, self.inodes, self.api, self.num_retries, k))
450             if e.update():
451                 self._entries[k] = e
452                 return True
453             else:
454                 return False
455         except Exception as e:
456             _logger.debug('arv-mount exception keep %s', e)
457             return False
458
459     def __getitem__(self, item):
460         if item in self:
461             return self._entries[item]
462         else:
463             raise KeyError("No collection with id " + item)
464
465
466 class RecursiveInvalidateDirectory(Directory):
467     def invalidate(self):
468         if self.inode == llfuse.ROOT_INODE:
469             llfuse.lock.acquire()
470         try:
471             super(RecursiveInvalidateDirectory, self).invalidate()
472             for a in self._entries:
473                 self._entries[a].invalidate()
474         except Exception:
475             _logger.exception()
476         finally:
477             if self.inode == llfuse.ROOT_INODE:
478                 llfuse.lock.release()
479
480
481 class TagsDirectory(RecursiveInvalidateDirectory):
482     '''A special directory that contains as subdirectories all tags visible to the user.'''
483
484     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
485         super(TagsDirectory, self).__init__(parent_inode)
486         self.inodes = inodes
487         self.api = api
488         self.num_retries = num_retries
489         self._poll = True
490         self._poll_time = poll_time
491
492     def update(self):
493         with llfuse.lock_released:
494             tags = self.api.links().list(
495                 filters=[['link_class', '=', 'tag']],
496                 select=['name'], distinct=True
497                 ).execute(num_retries=self.num_retries)
498         if "items" in tags:
499             self.merge(tags['items'],
500                        lambda i: i['name'],
501                        lambda a, i: a.tag == i['name'],
502                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
503
504
505 class TagDirectory(Directory):
506     '''A special directory that contains as subdirectories all collections visible
507     to the user that are tagged with a particular tag.
508     '''
509
510     def __init__(self, parent_inode, inodes, api, num_retries, tag,
511                  poll=False, poll_time=60):
512         super(TagDirectory, self).__init__(parent_inode)
513         self.inodes = inodes
514         self.api = api
515         self.num_retries = num_retries
516         self.tag = tag
517         self._poll = poll
518         self._poll_time = poll_time
519
520     def update(self):
521         with llfuse.lock_released:
522             taggedcollections = self.api.links().list(
523                 filters=[['link_class', '=', 'tag'],
524                          ['name', '=', self.tag],
525                          ['head_uuid', 'is_a', 'arvados#collection']],
526                 select=['head_uuid']
527                 ).execute(num_retries=self.num_retries)
528         self.merge(taggedcollections['items'],
529                    lambda i: i['head_uuid'],
530                    lambda a, i: a.collection_locator == i['head_uuid'],
531                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
532
533
534 class ProjectDirectory(Directory):
535     '''A special directory that contains the contents of a project.'''
536
537     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
538                  poll=False, poll_time=60):
539         super(ProjectDirectory, self).__init__(parent_inode)
540         self.inodes = inodes
541         self.api = api
542         self.num_retries = num_retries
543         self.project_object = project_object
544         self.project_object_file = None
545         self.uuid = project_object['uuid']
546         self._poll = poll
547         self._poll_time = poll_time
548
549     def createDirectory(self, i):
550         if collection_uuid_pattern.match(i['uuid']):
551             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
552         elif group_uuid_pattern.match(i['uuid']):
553             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
554         elif link_uuid_pattern.match(i['uuid']):
555             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
556                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
557             else:
558                 return None
559         elif uuid_pattern.match(i['uuid']):
560             return ObjectFile(self.parent_inode, i)
561         else:
562             return None
563
564     def update(self):
565         if self.project_object_file == None:
566             self.project_object_file = ObjectFile(self.inode, self.project_object)
567             self.inodes.add_entry(self.project_object_file)
568
569         def namefn(i):
570             if 'name' in i:
571                 if i['name'] is None or len(i['name']) == 0:
572                     return None
573                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
574                     # collection or subproject
575                     return i['name']
576                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
577                     # name link
578                     return i['name']
579                 elif 'kind' in i and i['kind'].startswith('arvados#'):
580                     # something else
581                     return "{}.{}".format(i['name'], i['kind'][8:])
582             else:
583                 return None
584
585         def samefn(a, i):
586             if isinstance(a, CollectionDirectory):
587                 return a.collection_locator == i['uuid']
588             elif isinstance(a, ProjectDirectory):
589                 return a.uuid == i['uuid']
590             elif isinstance(a, ObjectFile):
591                 return a.uuid == i['uuid'] and not a.stale()
592             return False
593
594         with llfuse.lock_released:
595             if group_uuid_pattern.match(self.uuid):
596                 self.project_object = self.api.groups().get(
597                     uuid=self.uuid).execute(num_retries=self.num_retries)
598             elif user_uuid_pattern.match(self.uuid):
599                 self.project_object = self.api.users().get(
600                     uuid=self.uuid).execute(num_retries=self.num_retries)
601
602             contents = arvados.util.list_all(self.api.groups().contents,
603                                              self.num_retries, uuid=self.uuid)
604             # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
605             contents += arvados.util.list_all(
606                 self.api.links().list, self.num_retries,
607                 filters=[['tail_uuid', '=', self.uuid],
608                          ['link_class', '=', 'name']])
609
610         # end with llfuse.lock_released, re-acquire lock
611
612         self.merge(contents,
613                    namefn,
614                    samefn,
615                    self.createDirectory)
616
617     def __getitem__(self, item):
618         self.checkupdate()
619         if item == '.arvados#project':
620             return self.project_object_file
621         else:
622             return super(ProjectDirectory, self).__getitem__(item)
623
624     def __contains__(self, k):
625         if k == '.arvados#project':
626             return True
627         else:
628             return super(ProjectDirectory, self).__contains__(k)
629
630
631 class SharedDirectory(Directory):
632     '''A special directory that represents users or groups who have shared projects with me.'''
633
634     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
635                  poll=False, poll_time=60):
636         super(SharedDirectory, self).__init__(parent_inode)
637         self.inodes = inodes
638         self.api = api
639         self.num_retries = num_retries
640         self.current_user = api.users().current().execute(num_retries=num_retries)
641         self._poll = True
642         self._poll_time = poll_time
643
644     def update(self):
645         with llfuse.lock_released:
646             all_projects = arvados.util.list_all(
647                 self.api.groups().list, self.num_retries,
648                 filters=[['group_class','=','project']])
649             objects = {}
650             for ob in all_projects:
651                 objects[ob['uuid']] = ob
652
653             roots = []
654             root_owners = {}
655             for ob in all_projects:
656                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
657                     roots.append(ob)
658                     root_owners[ob['owner_uuid']] = True
659
660             lusers = arvados.util.list_all(
661                 self.api.users().list, self.num_retries,
662                 filters=[['uuid','in', list(root_owners)]])
663             lgroups = arvados.util.list_all(
664                 self.api.groups().list, self.num_retries,
665                 filters=[['uuid','in', list(root_owners)]])
666
667             users = {}
668             groups = {}
669
670             for l in lusers:
671                 objects[l["uuid"]] = l
672             for l in lgroups:
673                 objects[l["uuid"]] = l
674
675             contents = {}
676             for r in root_owners:
677                 if r in objects:
678                     obr = objects[r]
679                     if "name" in obr:
680                         contents[obr["name"]] = obr
681                     if "first_name" in obr:
682                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
683
684             for r in roots:
685                 if r['owner_uuid'] not in objects:
686                     contents[r['name']] = r
687
688         # end with llfuse.lock_released, re-acquire lock
689
690         try:
691             self.merge(contents.items(),
692                        lambda i: i[0],
693                        lambda a, i: a.uuid == i[1]['uuid'],
694                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
695         except Exception:
696             _logger.exception()
697
698
699 class FileHandle(object):
700     '''Connects a numeric file handle to a File or Directory object that has
701     been opened by the client.'''
702
703     def __init__(self, fh, entry):
704         self.fh = fh
705         self.entry = entry
706
707
708 class Inodes(object):
709     '''Manage the set of inodes.  This is the mapping from a numeric id
710     to a concrete File or Directory object'''
711
712     def __init__(self):
713         self._entries = {}
714         self._counter = itertools.count(llfuse.ROOT_INODE)
715
716     def __getitem__(self, item):
717         return self._entries[item]
718
719     def __setitem__(self, key, item):
720         self._entries[key] = item
721
722     def __iter__(self):
723         return self._entries.iterkeys()
724
725     def items(self):
726         return self._entries.items()
727
728     def __contains__(self, k):
729         return k in self._entries
730
731     def add_entry(self, entry):
732         entry.inode = next(self._counter)
733         self._entries[entry.inode] = entry
734         return entry
735
736     def del_entry(self, entry):
737         llfuse.invalidate_inode(entry.inode)
738         del self._entries[entry.inode]
739
740 class Operations(llfuse.Operations):
741     '''This is the main interface with llfuse.  The methods on this object are
742     called by llfuse threads to service FUSE events to query and read from
743     the file system.
744
745     llfuse has its own global lock which is acquired before calling a request handler,
746     so request handlers do not run concurrently unless the lock is explicitly released
747     using "with llfuse.lock_released:"'''
748
749     def __init__(self, uid, gid, encoding="utf-8"):
750         super(Operations, self).__init__()
751
752         self.inodes = Inodes()
753         self.uid = uid
754         self.gid = gid
755         self.encoding = encoding
756
757         # dict of inode to filehandle
758         self._filehandles = {}
759         self._filehandles_counter = 1
760
761         # Other threads that need to wait until the fuse driver
762         # is fully initialized should wait() on this event object.
763         self.initlock = threading.Event()
764
765     def init(self):
766         # Allow threads that are waiting for the driver to be finished
767         # initializing to continue
768         self.initlock.set()
769
770     def access(self, inode, mode, ctx):
771         return True
772
773     def getattr(self, inode):
774         if inode not in self.inodes:
775             raise llfuse.FUSEError(errno.ENOENT)
776
777         e = self.inodes[inode]
778
779         entry = llfuse.EntryAttributes()
780         entry.st_ino = inode
781         entry.generation = 0
782         entry.entry_timeout = 300
783         entry.attr_timeout = 300
784
785         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
786         if isinstance(e, Directory):
787             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
788         elif isinstance(e, StreamReaderFile):
789             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
790         else:
791             entry.st_mode |= stat.S_IFREG
792
793         entry.st_nlink = 1
794         entry.st_uid = self.uid
795         entry.st_gid = self.gid
796         entry.st_rdev = 0
797
798         entry.st_size = e.size()
799
800         entry.st_blksize = 512
801         entry.st_blocks = (e.size()/512)+1
802         entry.st_atime = int(e.atime())
803         entry.st_mtime = int(e.mtime())
804         entry.st_ctime = int(e.mtime())
805
806         return entry
807
808     def lookup(self, parent_inode, name):
809         name = unicode(name, self.encoding)
810         _logger.debug("arv-mount lookup: parent_inode %i name %s",
811                       parent_inode, name)
812         inode = None
813
814         if name == '.':
815             inode = parent_inode
816         else:
817             if parent_inode in self.inodes:
818                 p = self.inodes[parent_inode]
819                 if name == '..':
820                     inode = p.parent_inode
821                 elif isinstance(p, Directory) and name in p:
822                     inode = p[name].inode
823
824         if inode != None:
825             return self.getattr(inode)
826         else:
827             raise llfuse.FUSEError(errno.ENOENT)
828
829     def open(self, inode, flags):
830         if inode in self.inodes:
831             p = self.inodes[inode]
832         else:
833             raise llfuse.FUSEError(errno.ENOENT)
834
835         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
836             raise llfuse.FUSEError(errno.EROFS)
837
838         if isinstance(p, Directory):
839             raise llfuse.FUSEError(errno.EISDIR)
840
841         fh = self._filehandles_counter
842         self._filehandles_counter += 1
843         self._filehandles[fh] = FileHandle(fh, p)
844         return fh
845
846     def read(self, fh, off, size):
847         _logger.debug("arv-mount read %i %i %i", fh, off, size)
848         if fh in self._filehandles:
849             handle = self._filehandles[fh]
850         else:
851             raise llfuse.FUSEError(errno.EBADF)
852
853         # update atime
854         handle.entry._atime = time.time()
855
856         try:
857             with llfuse.lock_released:
858                 return handle.entry.readfrom(off, size)
859         except arvados.errors.NotFoundError as e:
860             _logger.warning("Block not found: " + str(e))
861             raise llfuse.FUSEError(errno.EIO)
862         except Exception:
863             _logger.exception()
864             raise llfuse.FUSEError(errno.EIO)
865
866     def release(self, fh):
867         if fh in self._filehandles:
868             del self._filehandles[fh]
869
870     def opendir(self, inode):
871         _logger.debug("arv-mount opendir: inode %i", inode)
872
873         if inode in self.inodes:
874             p = self.inodes[inode]
875         else:
876             raise llfuse.FUSEError(errno.ENOENT)
877
878         if not isinstance(p, Directory):
879             raise llfuse.FUSEError(errno.ENOTDIR)
880
881         fh = self._filehandles_counter
882         self._filehandles_counter += 1
883         if p.parent_inode in self.inodes:
884             parent = self.inodes[p.parent_inode]
885         else:
886             raise llfuse.FUSEError(errno.EIO)
887
888         # update atime
889         p._atime = time.time()
890
891         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
892         return fh
893
894     def readdir(self, fh, off):
895         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
896
897         if fh in self._filehandles:
898             handle = self._filehandles[fh]
899         else:
900             raise llfuse.FUSEError(errno.EBADF)
901
902         _logger.debug("arv-mount handle.entry %s", handle.entry)
903
904         e = off
905         while e < len(handle.entry):
906             if handle.entry[e][1].inode in self.inodes:
907                 try:
908                     yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
909                 except UnicodeEncodeError:
910                     pass
911             e += 1
912
913     def releasedir(self, fh):
914         del self._filehandles[fh]
915
916     def statfs(self):
917         st = llfuse.StatvfsData()
918         st.f_bsize = 64 * 1024
919         st.f_blocks = 0
920         st.f_files = 0
921
922         st.f_bfree = 0
923         st.f_bavail = 0
924
925         st.f_ffree = 0
926         st.f_favail = 0
927
928         st.f_frsize = 0
929         return st
930
931     # The llfuse documentation recommends only overloading functions that
932     # are actually implemented, as the default implementation will raise ENOSYS.
933     # However, there is a bug in the llfuse default implementation of create()
934     # "create() takes exactly 5 positional arguments (6 given)" which will crash
935     # arv-mount.
936     # The workaround is to implement it with the proper number of parameters,
937     # and then everything works out.
938     def create(self, p1, p2, p3, p4, p5):
939         raise llfuse.FUSEError(errno.EROFS)