3408: Merge branch 'master' into 3408-production-datamanager
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
25
26 _logger = logging.getLogger('arvados.arvados_fuse')
27
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
32
33 class SafeApi(object):
34     """Threadsafe wrapper for API object.
35
36     This stores and returns a different api object per thread, because
37     httplib2 which underlies apiclient is not threadsafe.
38     """
39
40     def __init__(self, config):
41         self.host = config.get('ARVADOS_API_HOST')
42         self.api_token = config.get('ARVADOS_API_TOKEN')
43         self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
44         self.local = threading.local()
45         self.block_cache = arvados.KeepBlockCache()
46
47     def localapi(self):
48         if 'api' not in self.local.__dict__:
49             self.local.api = arvados.api(
50                 version='v1',
51                 host=self.host, token=self.api_token, insecure=self.insecure)
52         return self.local.api
53
54     def localkeep(self):
55         if 'keep' not in self.local.__dict__:
56             self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
57         return self.local.keep
58
59     def __getattr__(self, name):
60         # Proxy nonexistent attributes to the local API client.
61         try:
62             return getattr(self.localapi(), name)
63         except AttributeError:
64             return super(SafeApi, self).__getattr__(name)
65
66
67 def convertTime(t):
68     """Parse Arvados timestamp to unix time."""
69     if not t:
70         return 0
71     try:
72         return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
73     except (TypeError, ValueError):
74         return 0
75
76 def sanitize_filename(dirty):
77     '''Replace disallowed filename characters with harmless "_".'''
78     if dirty is None:
79         return None
80     elif dirty == '':
81         return '_'
82     elif dirty == '.':
83         return '_'
84     elif dirty == '..':
85         return '__'
86     else:
87         return _disallowed_filename_characters.sub('_', dirty)
88
89
90 class FreshBase(object):
91     '''Base class for maintaining fresh/stale state to determine when to update.'''
92     def __init__(self):
93         self._stale = True
94         self._poll = False
95         self._last_update = time.time()
96         self._atime = time.time()
97         self._poll_time = 60
98
99     # Mark the value as stale
100     def invalidate(self):
101         self._stale = True
102
103     # Test if the entries dict is stale.
104     def stale(self):
105         if self._stale:
106             return True
107         if self._poll:
108             return (self._last_update + self._poll_time) < self._atime
109         return False
110
111     def fresh(self):
112         self._stale = False
113         self._last_update = time.time()
114
115     def atime(self):
116         return self._atime
117
118 class File(FreshBase):
119     '''Base for file objects.'''
120
121     def __init__(self, parent_inode, _mtime=0):
122         super(File, self).__init__()
123         self.inode = None
124         self.parent_inode = parent_inode
125         self._mtime = _mtime
126
127     def size(self):
128         return 0
129
130     def readfrom(self, off, size):
131         return ''
132
133     def mtime(self):
134         return self._mtime
135
136
137 class StreamReaderFile(File):
138     '''Wraps a StreamFileReader as a file.'''
139
140     def __init__(self, parent_inode, reader, _mtime):
141         super(StreamReaderFile, self).__init__(parent_inode, _mtime)
142         self.reader = reader
143
144     def size(self):
145         return self.reader.size()
146
147     def readfrom(self, off, size):
148         return self.reader.readfrom(off, size)
149
150     def stale(self):
151         return False
152
153
154 class StringFile(File):
155     '''Wrap a simple string as a file'''
156     def __init__(self, parent_inode, contents, _mtime):
157         super(StringFile, self).__init__(parent_inode, _mtime)
158         self.contents = contents
159
160     def size(self):
161         return len(self.contents)
162
163     def readfrom(self, off, size):
164         return self.contents[off:(off+size)]
165
166
167 class ObjectFile(StringFile):
168     '''Wrap a dict as a serialized json object.'''
169
170     def __init__(self, parent_inode, obj):
171         super(ObjectFile, self).__init__(parent_inode, "", 0)
172         self.uuid = obj['uuid']
173         self.update(obj)
174
175     def update(self, obj):
176         self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
177         self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
178
179
180 class Directory(FreshBase):
181     '''Generic directory object, backed by a dict.
182     Consists of a set of entries with the key representing the filename
183     and the value referencing a File or Directory object.
184     '''
185
186     def __init__(self, parent_inode):
187         super(Directory, self).__init__()
188
189         '''parent_inode is the integer inode number'''
190         self.inode = None
191         if not isinstance(parent_inode, int):
192             raise Exception("parent_inode should be an int")
193         self.parent_inode = parent_inode
194         self._entries = {}
195         self._mtime = time.time()
196
197     #  Overriden by subclasses to implement logic to update the entries dict
198     #  when the directory is stale
199     def update(self):
200         pass
201
202     # Only used when computing the size of the disk footprint of the directory
203     # (stub)
204     def size(self):
205         return 0
206
207     def checkupdate(self):
208         if self.stale():
209             try:
210                 self.update()
211             except apiclient.errors.HttpError as e:
212                 _logger.debug(e)
213
214     def __getitem__(self, item):
215         self.checkupdate()
216         return self._entries[item]
217
218     def items(self):
219         self.checkupdate()
220         return self._entries.items()
221
222     def __iter__(self):
223         self.checkupdate()
224         return self._entries.iterkeys()
225
226     def __contains__(self, k):
227         self.checkupdate()
228         return k in self._entries
229
230     def merge(self, items, fn, same, new_entry):
231         '''Helper method for updating the contents of the directory.  Takes a list
232         describing the new contents of the directory, reuse entries that are
233         the same in both the old and new lists, create new entries, and delete
234         old entries missing from the new list.
235
236         items: iterable with new directory contents
237
238         fn: function to take an entry in 'items' and return the desired file or
239         directory name, or None if this entry should be skipped
240
241         same: function to compare an existing entry (a File or Directory
242         object) with an entry in the items list to determine whether to keep
243         the existing entry.
244
245         new_entry: function to create a new directory entry (File or Directory
246         object) from an entry in the items list.
247
248         '''
249
250         oldentries = self._entries
251         self._entries = {}
252         changed = False
253         for i in items:
254             name = sanitize_filename(fn(i))
255             if name:
256                 if name in oldentries and same(oldentries[name], i):
257                     # move existing directory entry over
258                     self._entries[name] = oldentries[name]
259                     del oldentries[name]
260                 else:
261                     # create new directory entry
262                     ent = new_entry(i)
263                     if ent is not None:
264                         self._entries[name] = self.inodes.add_entry(ent)
265                         changed = True
266
267         # delete any other directory entries that were not in found in 'items'
268         for i in oldentries:
269             llfuse.invalidate_entry(self.inode, str(i))
270             self.inodes.del_entry(oldentries[i])
271             changed = True
272
273         if changed:
274             self._mtime = time.time()
275
276         self.fresh()
277
278     def clear(self):
279         '''Delete all entries'''
280         oldentries = self._entries
281         self._entries = {}
282         for n in oldentries:
283             if isinstance(n, Directory):
284                 n.clear()
285             llfuse.invalidate_entry(self.inode, str(n))
286             self.inodes.del_entry(oldentries[n])
287         llfuse.invalidate_inode(self.inode)
288         self.invalidate()
289
290     def mtime(self):
291         return self._mtime
292
293
294 class CollectionDirectory(Directory):
295     '''Represents the root of a directory tree holding a collection.'''
296
297     def __init__(self, parent_inode, inodes, api, num_retries, collection):
298         super(CollectionDirectory, self).__init__(parent_inode)
299         self.inodes = inodes
300         self.api = api
301         self.num_retries = num_retries
302         self.collection_object_file = None
303         self.collection_object = None
304         if isinstance(collection, dict):
305             self.collection_locator = collection['uuid']
306             self._mtime = convertTime(collection.get('modified_at'))
307         else:
308             self.collection_locator = collection
309             self._mtime = 0
310
311     def same(self, i):
312         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
313
314     # Used by arv-web.py to switch the contents of the CollectionDirectory
315     def change_collection(self, new_locator):
316         """Switch the contents of the CollectionDirectory.  Must be called with llfuse.lock held."""
317         self.collection_locator = new_locator
318         self.collection_object = None
319         self.update()
320
321     def new_collection(self, new_collection_object, coll_reader):
322         self.collection_object = new_collection_object
323
324         self._mtime = convertTime(self.collection_object.get('modified_at'))
325
326         if self.collection_object_file is not None:
327             self.collection_object_file.update(self.collection_object)
328
329         self.clear()
330         for s in coll_reader.all_streams():
331             cwd = self
332             for part in s.name().split('/'):
333                 if part != '' and part != '.':
334                     partname = sanitize_filename(part)
335                     if partname not in cwd._entries:
336                         cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
337                     cwd = cwd._entries[partname]
338             for k, v in s.files().items():
339                 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
340
341     def update(self):
342         try:
343             if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
344                 return True
345
346             if self.collection_locator is None:
347                 self.fresh()
348                 return True
349
350             with llfuse.lock_released:
351                 coll_reader = arvados.CollectionReader(
352                     self.collection_locator, self.api, self.api.localkeep(),
353                     num_retries=self.num_retries)
354                 new_collection_object = coll_reader.api_response() or {}
355                 # If the Collection only exists in Keep, there will be no API
356                 # response.  Fill in the fields we need.
357                 if 'uuid' not in new_collection_object:
358                     new_collection_object['uuid'] = self.collection_locator
359                 if "portable_data_hash" not in new_collection_object:
360                     new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
361                 if 'manifest_text' not in new_collection_object:
362                     new_collection_object['manifest_text'] = coll_reader.manifest_text()
363                 coll_reader.normalize()
364             # end with llfuse.lock_released, re-acquire lock
365
366             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
367                 self.new_collection(new_collection_object, coll_reader)
368
369             self.fresh()
370             return True
371         except arvados.errors.NotFoundError:
372             _logger.exception("arv-mount %s: error", self.collection_locator)
373         except arvados.errors.ArgumentError as detail:
374             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
375             if self.collection_object is not None and "manifest_text" in self.collection_object:
376                 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
377         except Exception:
378             _logger.exception("arv-mount %s: error", self.collection_locator)
379             if self.collection_object is not None and "manifest_text" in self.collection_object:
380                 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
381         return False
382
383     def __getitem__(self, item):
384         self.checkupdate()
385         if item == '.arvados#collection':
386             if self.collection_object_file is None:
387                 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
388                 self.inodes.add_entry(self.collection_object_file)
389             return self.collection_object_file
390         else:
391             return super(CollectionDirectory, self).__getitem__(item)
392
393     def __contains__(self, k):
394         if k == '.arvados#collection':
395             return True
396         else:
397             return super(CollectionDirectory, self).__contains__(k)
398
399
400 class MagicDirectory(Directory):
401     '''A special directory that logically contains the set of all extant keep
402     locators.  When a file is referenced by lookup(), it is tested to see if it
403     is a valid keep locator to a manifest, and if so, loads the manifest
404     contents as a subdirectory of this directory with the locator as the
405     directory name.  Since querying a list of all extant keep locators is
406     impractical, only collections that have already been accessed are visible
407     to readdir().
408     '''
409
410     README_TEXT = '''
411 This directory provides access to Arvados collections as subdirectories listed
412 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
413 the form '1234567890abcdefghijklmnopqrstuv+123').
414
415 Note that this directory will appear empty until you attempt to access a
416 specific collection subdirectory (such as trying to 'cd' into it), at which
417 point the collection will actually be looked up on the server and the directory
418 will appear if it exists.
419 '''.lstrip()
420
421     def __init__(self, parent_inode, inodes, api, num_retries):
422         super(MagicDirectory, self).__init__(parent_inode)
423         self.inodes = inodes
424         self.api = api
425         self.num_retries = num_retries
426
427     def __setattr__(self, name, value):
428         super(MagicDirectory, self).__setattr__(name, value)
429         # When we're assigned an inode, add a README.
430         if ((name == 'inode') and (self.inode is not None) and
431               (not self._entries)):
432             self._entries['README'] = self.inodes.add_entry(
433                 StringFile(self.inode, self.README_TEXT, time.time()))
434             # If we're the root directory, add an identical by_id subdirectory.
435             if self.inode == llfuse.ROOT_INODE:
436                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
437                         self.inode, self.inodes, self.api, self.num_retries))
438
439     def __contains__(self, k):
440         if k in self._entries:
441             return True
442
443         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
444             return False
445
446         try:
447             e = self.inodes.add_entry(CollectionDirectory(
448                     self.inode, self.inodes, self.api, self.num_retries, k))
449             if e.update():
450                 self._entries[k] = e
451                 return True
452             else:
453                 return False
454         except Exception as e:
455             _logger.debug('arv-mount exception keep %s', e)
456             return False
457
458     def __getitem__(self, item):
459         if item in self:
460             return self._entries[item]
461         else:
462             raise KeyError("No collection with id " + item)
463
464
465 class RecursiveInvalidateDirectory(Directory):
466     def invalidate(self):
467         if self.inode == llfuse.ROOT_INODE:
468             llfuse.lock.acquire()
469         try:
470             super(RecursiveInvalidateDirectory, self).invalidate()
471             for a in self._entries:
472                 self._entries[a].invalidate()
473         except Exception:
474             _logger.exception()
475         finally:
476             if self.inode == llfuse.ROOT_INODE:
477                 llfuse.lock.release()
478
479
480 class TagsDirectory(RecursiveInvalidateDirectory):
481     '''A special directory that contains as subdirectories all tags visible to the user.'''
482
483     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
484         super(TagsDirectory, self).__init__(parent_inode)
485         self.inodes = inodes
486         self.api = api
487         self.num_retries = num_retries
488         self._poll = True
489         self._poll_time = poll_time
490
491     def update(self):
492         with llfuse.lock_released:
493             tags = self.api.links().list(
494                 filters=[['link_class', '=', 'tag']],
495                 select=['name'], distinct=True
496                 ).execute(num_retries=self.num_retries)
497         if "items" in tags:
498             self.merge(tags['items'],
499                        lambda i: i['name'],
500                        lambda a, i: a.tag == i['name'],
501                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
502
503
504 class TagDirectory(Directory):
505     '''A special directory that contains as subdirectories all collections visible
506     to the user that are tagged with a particular tag.
507     '''
508
509     def __init__(self, parent_inode, inodes, api, num_retries, tag,
510                  poll=False, poll_time=60):
511         super(TagDirectory, self).__init__(parent_inode)
512         self.inodes = inodes
513         self.api = api
514         self.num_retries = num_retries
515         self.tag = tag
516         self._poll = poll
517         self._poll_time = poll_time
518
519     def update(self):
520         with llfuse.lock_released:
521             taggedcollections = self.api.links().list(
522                 filters=[['link_class', '=', 'tag'],
523                          ['name', '=', self.tag],
524                          ['head_uuid', 'is_a', 'arvados#collection']],
525                 select=['head_uuid']
526                 ).execute(num_retries=self.num_retries)
527         self.merge(taggedcollections['items'],
528                    lambda i: i['head_uuid'],
529                    lambda a, i: a.collection_locator == i['head_uuid'],
530                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
531
532
533 class ProjectDirectory(Directory):
534     '''A special directory that contains the contents of a project.'''
535
536     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
537                  poll=False, poll_time=60):
538         super(ProjectDirectory, self).__init__(parent_inode)
539         self.inodes = inodes
540         self.api = api
541         self.num_retries = num_retries
542         self.project_object = project_object
543         self.project_object_file = None
544         self.uuid = project_object['uuid']
545         self._poll = poll
546         self._poll_time = poll_time
547
548     def createDirectory(self, i):
549         if collection_uuid_pattern.match(i['uuid']):
550             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
551         elif group_uuid_pattern.match(i['uuid']):
552             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
553         elif link_uuid_pattern.match(i['uuid']):
554             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
555                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
556             else:
557                 return None
558         elif uuid_pattern.match(i['uuid']):
559             return ObjectFile(self.parent_inode, i)
560         else:
561             return None
562
563     def update(self):
564         if self.project_object_file == None:
565             self.project_object_file = ObjectFile(self.inode, self.project_object)
566             self.inodes.add_entry(self.project_object_file)
567
568         def namefn(i):
569             if 'name' in i:
570                 if i['name'] is None or len(i['name']) == 0:
571                     return None
572                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
573                     # collection or subproject
574                     return i['name']
575                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
576                     # name link
577                     return i['name']
578                 elif 'kind' in i and i['kind'].startswith('arvados#'):
579                     # something else
580                     return "{}.{}".format(i['name'], i['kind'][8:])
581             else:
582                 return None
583
584         def samefn(a, i):
585             if isinstance(a, CollectionDirectory):
586                 return a.collection_locator == i['uuid']
587             elif isinstance(a, ProjectDirectory):
588                 return a.uuid == i['uuid']
589             elif isinstance(a, ObjectFile):
590                 return a.uuid == i['uuid'] and not a.stale()
591             return False
592
593         with llfuse.lock_released:
594             if group_uuid_pattern.match(self.uuid):
595                 self.project_object = self.api.groups().get(
596                     uuid=self.uuid).execute(num_retries=self.num_retries)
597             elif user_uuid_pattern.match(self.uuid):
598                 self.project_object = self.api.users().get(
599                     uuid=self.uuid).execute(num_retries=self.num_retries)
600
601             contents = arvados.util.list_all(self.api.groups().contents,
602                                              self.num_retries, uuid=self.uuid)
603             # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
604             contents += arvados.util.list_all(
605                 self.api.links().list, self.num_retries,
606                 filters=[['tail_uuid', '=', self.uuid],
607                          ['link_class', '=', 'name']])
608
609         # end with llfuse.lock_released, re-acquire lock
610
611         self.merge(contents,
612                    namefn,
613                    samefn,
614                    self.createDirectory)
615
616     def __getitem__(self, item):
617         self.checkupdate()
618         if item == '.arvados#project':
619             return self.project_object_file
620         else:
621             return super(ProjectDirectory, self).__getitem__(item)
622
623     def __contains__(self, k):
624         if k == '.arvados#project':
625             return True
626         else:
627             return super(ProjectDirectory, self).__contains__(k)
628
629
630 class SharedDirectory(Directory):
631     '''A special directory that represents users or groups who have shared projects with me.'''
632
633     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
634                  poll=False, poll_time=60):
635         super(SharedDirectory, self).__init__(parent_inode)
636         self.inodes = inodes
637         self.api = api
638         self.num_retries = num_retries
639         self.current_user = api.users().current().execute(num_retries=num_retries)
640         self._poll = True
641         self._poll_time = poll_time
642
643     def update(self):
644         with llfuse.lock_released:
645             all_projects = arvados.util.list_all(
646                 self.api.groups().list, self.num_retries,
647                 filters=[['group_class','=','project']])
648             objects = {}
649             for ob in all_projects:
650                 objects[ob['uuid']] = ob
651
652             roots = []
653             root_owners = {}
654             for ob in all_projects:
655                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
656                     roots.append(ob)
657                     root_owners[ob['owner_uuid']] = True
658
659             lusers = arvados.util.list_all(
660                 self.api.users().list, self.num_retries,
661                 filters=[['uuid','in', list(root_owners)]])
662             lgroups = arvados.util.list_all(
663                 self.api.groups().list, self.num_retries,
664                 filters=[['uuid','in', list(root_owners)]])
665
666             users = {}
667             groups = {}
668
669             for l in lusers:
670                 objects[l["uuid"]] = l
671             for l in lgroups:
672                 objects[l["uuid"]] = l
673
674             contents = {}
675             for r in root_owners:
676                 if r in objects:
677                     obr = objects[r]
678                     if "name" in obr:
679                         contents[obr["name"]] = obr
680                     if "first_name" in obr:
681                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
682
683             for r in roots:
684                 if r['owner_uuid'] not in objects:
685                     contents[r['name']] = r
686
687         # end with llfuse.lock_released, re-acquire lock
688
689         try:
690             self.merge(contents.items(),
691                        lambda i: i[0],
692                        lambda a, i: a.uuid == i[1]['uuid'],
693                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
694         except Exception:
695             _logger.exception()
696
697
698 class FileHandle(object):
699     '''Connects a numeric file handle to a File or Directory object that has
700     been opened by the client.'''
701
702     def __init__(self, fh, entry):
703         self.fh = fh
704         self.entry = entry
705
706
707 class Inodes(object):
708     '''Manage the set of inodes.  This is the mapping from a numeric id
709     to a concrete File or Directory object'''
710
711     def __init__(self):
712         self._entries = {}
713         self._counter = itertools.count(llfuse.ROOT_INODE)
714
715     def __getitem__(self, item):
716         return self._entries[item]
717
718     def __setitem__(self, key, item):
719         self._entries[key] = item
720
721     def __iter__(self):
722         return self._entries.iterkeys()
723
724     def items(self):
725         return self._entries.items()
726
727     def __contains__(self, k):
728         return k in self._entries
729
730     def add_entry(self, entry):
731         entry.inode = next(self._counter)
732         self._entries[entry.inode] = entry
733         return entry
734
735     def del_entry(self, entry):
736         llfuse.invalidate_inode(entry.inode)
737         del self._entries[entry.inode]
738
739 class Operations(llfuse.Operations):
740     '''This is the main interface with llfuse.  The methods on this object are
741     called by llfuse threads to service FUSE events to query and read from
742     the file system.
743
744     llfuse has its own global lock which is acquired before calling a request handler,
745     so request handlers do not run concurrently unless the lock is explicitly released
746     using "with llfuse.lock_released:"'''
747
748     def __init__(self, uid, gid, encoding="utf-8"):
749         super(Operations, self).__init__()
750
751         self.inodes = Inodes()
752         self.uid = uid
753         self.gid = gid
754         self.encoding = encoding
755
756         # dict of inode to filehandle
757         self._filehandles = {}
758         self._filehandles_counter = 1
759
760         # Other threads that need to wait until the fuse driver
761         # is fully initialized should wait() on this event object.
762         self.initlock = threading.Event()
763
764     def init(self):
765         # Allow threads that are waiting for the driver to be finished
766         # initializing to continue
767         self.initlock.set()
768
769     def access(self, inode, mode, ctx):
770         return True
771
772     def getattr(self, inode):
773         if inode not in self.inodes:
774             raise llfuse.FUSEError(errno.ENOENT)
775
776         e = self.inodes[inode]
777
778         entry = llfuse.EntryAttributes()
779         entry.st_ino = inode
780         entry.generation = 0
781         entry.entry_timeout = 300
782         entry.attr_timeout = 300
783
784         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
785         if isinstance(e, Directory):
786             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
787         elif isinstance(e, StreamReaderFile):
788             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
789         else:
790             entry.st_mode |= stat.S_IFREG
791
792         entry.st_nlink = 1
793         entry.st_uid = self.uid
794         entry.st_gid = self.gid
795         entry.st_rdev = 0
796
797         entry.st_size = e.size()
798
799         entry.st_blksize = 512
800         entry.st_blocks = (e.size()/512)+1
801         entry.st_atime = int(e.atime())
802         entry.st_mtime = int(e.mtime())
803         entry.st_ctime = int(e.mtime())
804
805         return entry
806
807     def lookup(self, parent_inode, name):
808         name = unicode(name, self.encoding)
809         _logger.debug("arv-mount lookup: parent_inode %i name %s",
810                       parent_inode, name)
811         inode = None
812
813         if name == '.':
814             inode = parent_inode
815         else:
816             if parent_inode in self.inodes:
817                 p = self.inodes[parent_inode]
818                 if name == '..':
819                     inode = p.parent_inode
820                 elif isinstance(p, Directory) and name in p:
821                     inode = p[name].inode
822
823         if inode != None:
824             return self.getattr(inode)
825         else:
826             raise llfuse.FUSEError(errno.ENOENT)
827
828     def open(self, inode, flags):
829         if inode in self.inodes:
830             p = self.inodes[inode]
831         else:
832             raise llfuse.FUSEError(errno.ENOENT)
833
834         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
835             raise llfuse.FUSEError(errno.EROFS)
836
837         if isinstance(p, Directory):
838             raise llfuse.FUSEError(errno.EISDIR)
839
840         fh = self._filehandles_counter
841         self._filehandles_counter += 1
842         self._filehandles[fh] = FileHandle(fh, p)
843         return fh
844
845     def read(self, fh, off, size):
846         _logger.debug("arv-mount read %i %i %i", fh, off, size)
847         if fh in self._filehandles:
848             handle = self._filehandles[fh]
849         else:
850             raise llfuse.FUSEError(errno.EBADF)
851
852         # update atime
853         handle.entry._atime = time.time()
854
855         try:
856             with llfuse.lock_released:
857                 return handle.entry.readfrom(off, size)
858         except arvados.errors.NotFoundError as e:
859             _logger.warning("Block not found: " + str(e))
860             raise llfuse.FUSEError(errno.EIO)
861         except Exception:
862             _logger.exception()
863             raise llfuse.FUSEError(errno.EIO)
864
865     def release(self, fh):
866         if fh in self._filehandles:
867             del self._filehandles[fh]
868
869     def opendir(self, inode):
870         _logger.debug("arv-mount opendir: inode %i", inode)
871
872         if inode in self.inodes:
873             p = self.inodes[inode]
874         else:
875             raise llfuse.FUSEError(errno.ENOENT)
876
877         if not isinstance(p, Directory):
878             raise llfuse.FUSEError(errno.ENOTDIR)
879
880         fh = self._filehandles_counter
881         self._filehandles_counter += 1
882         if p.parent_inode in self.inodes:
883             parent = self.inodes[p.parent_inode]
884         else:
885             raise llfuse.FUSEError(errno.EIO)
886
887         # update atime
888         p._atime = time.time()
889
890         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
891         return fh
892
893     def readdir(self, fh, off):
894         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
895
896         if fh in self._filehandles:
897             handle = self._filehandles[fh]
898         else:
899             raise llfuse.FUSEError(errno.EBADF)
900
901         _logger.debug("arv-mount handle.entry %s", handle.entry)
902
903         e = off
904         while e < len(handle.entry):
905             if handle.entry[e][1].inode in self.inodes:
906                 try:
907                     yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
908                 except UnicodeEncodeError:
909                     pass
910             e += 1
911
912     def releasedir(self, fh):
913         del self._filehandles[fh]
914
915     def statfs(self):
916         st = llfuse.StatvfsData()
917         st.f_bsize = 64 * 1024
918         st.f_blocks = 0
919         st.f_files = 0
920
921         st.f_bfree = 0
922         st.f_bavail = 0
923
924         st.f_ffree = 0
925         st.f_favail = 0
926
927         st.f_frsize = 0
928         return st
929
930     # The llfuse documentation recommends only overloading functions that
931     # are actually implemented, as the default implementation will raise ENOSYS.
932     # However, there is a bug in the llfuse default implementation of create()
933     # "create() takes exactly 5 positional arguments (6 given)" which will crash
934     # arv-mount.
935     # The workaround is to implement it with the proper number of parameters,
936     # and then everything works out.
937     def create(self, p1, p2, p3, p4, p5):
938         raise llfuse.FUSEError(errno.EROFS)