4823: Remove sync_mode() from Collection in favor of writable() flag.
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
25
26 _logger = logging.getLogger('arvados.arvados_fuse')
27
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
32
33 def convertTime(t):
34     """Parse Arvados timestamp to unix time."""
35     if not t:
36         return 0
37     try:
38         return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
39     except (TypeError, ValueError):
40         return 0
41
42 def sanitize_filename(dirty):
43     '''Replace disallowed filename characters with harmless "_".'''
44     if dirty is None:
45         return None
46     elif dirty == '':
47         return '_'
48     elif dirty == '.':
49         return '_'
50     elif dirty == '..':
51         return '__'
52     else:
53         return _disallowed_filename_characters.sub('_', dirty)
54
55
56 class FreshBase(object):
57     '''Base class for maintaining fresh/stale state to determine when to update.'''
58     def __init__(self):
59         self._stale = True
60         self._poll = False
61         self._last_update = time.time()
62         self._atime = time.time()
63         self._poll_time = 60
64
65     # Mark the value as stale
66     def invalidate(self):
67         self._stale = True
68
69     # Test if the entries dict is stale.
70     def stale(self):
71         if self._stale:
72             return True
73         if self._poll:
74             return (self._last_update + self._poll_time) < self._atime
75         return False
76
77     def fresh(self):
78         self._stale = False
79         self._last_update = time.time()
80
81     def atime(self):
82         return self._atime
83
84 class File(FreshBase):
85     '''Base for file objects.'''
86
87     def __init__(self, parent_inode, _mtime=0):
88         super(File, self).__init__()
89         self.inode = None
90         self.parent_inode = parent_inode
91         self._mtime = _mtime
92
93     def size(self):
94         return 0
95
96     def readfrom(self, off, size):
97         return ''
98
99     def mtime(self):
100         return self._mtime
101
102
103 class StreamReaderFile(File):
104     '''Wraps a StreamFileReader as a file.'''
105
106     def __init__(self, parent_inode, reader, _mtime):
107         super(StreamReaderFile, self).__init__(parent_inode, _mtime)
108         self.reader = reader
109
110     def size(self):
111         return self.reader.size()
112
113     def readfrom(self, off, size):
114         return self.reader.readfrom(off, size)
115
116     def stale(self):
117         return False
118
119
120 class StringFile(File):
121     '''Wrap a simple string as a file'''
122     def __init__(self, parent_inode, contents, _mtime):
123         super(StringFile, self).__init__(parent_inode, _mtime)
124         self.contents = contents
125
126     def size(self):
127         return len(self.contents)
128
129     def readfrom(self, off, size):
130         return self.contents[off:(off+size)]
131
132
133 class ObjectFile(StringFile):
134     '''Wrap a dict as a serialized json object.'''
135
136     def __init__(self, parent_inode, obj):
137         super(ObjectFile, self).__init__(parent_inode, "", 0)
138         self.uuid = obj['uuid']
139         self.update(obj)
140
141     def update(self, obj):
142         self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
143         self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
144
145
146 class Directory(FreshBase):
147     '''Generic directory object, backed by a dict.
148     Consists of a set of entries with the key representing the filename
149     and the value referencing a File or Directory object.
150     '''
151
152     def __init__(self, parent_inode):
153         super(Directory, self).__init__()
154
155         '''parent_inode is the integer inode number'''
156         self.inode = None
157         if not isinstance(parent_inode, int):
158             raise Exception("parent_inode should be an int")
159         self.parent_inode = parent_inode
160         self._entries = {}
161         self._mtime = time.time()
162
163     #  Overriden by subclasses to implement logic to update the entries dict
164     #  when the directory is stale
165     def update(self):
166         pass
167
168     # Only used when computing the size of the disk footprint of the directory
169     # (stub)
170     def size(self):
171         return 0
172
173     def checkupdate(self):
174         if self.stale():
175             try:
176                 self.update()
177             except apiclient.errors.HttpError as e:
178                 _logger.debug(e)
179
180     def __getitem__(self, item):
181         self.checkupdate()
182         return self._entries[item]
183
184     def items(self):
185         self.checkupdate()
186         return self._entries.items()
187
188     def __iter__(self):
189         self.checkupdate()
190         return self._entries.iterkeys()
191
192     def __contains__(self, k):
193         self.checkupdate()
194         return k in self._entries
195
196     def merge(self, items, fn, same, new_entry):
197         '''Helper method for updating the contents of the directory.  Takes a list
198         describing the new contents of the directory, reuse entries that are
199         the same in both the old and new lists, create new entries, and delete
200         old entries missing from the new list.
201
202         items: iterable with new directory contents
203
204         fn: function to take an entry in 'items' and return the desired file or
205         directory name, or None if this entry should be skipped
206
207         same: function to compare an existing entry (a File or Directory
208         object) with an entry in the items list to determine whether to keep
209         the existing entry.
210
211         new_entry: function to create a new directory entry (File or Directory
212         object) from an entry in the items list.
213
214         '''
215
216         oldentries = self._entries
217         self._entries = {}
218         changed = False
219         for i in items:
220             name = sanitize_filename(fn(i))
221             if name:
222                 if name in oldentries and same(oldentries[name], i):
223                     # move existing directory entry over
224                     self._entries[name] = oldentries[name]
225                     del oldentries[name]
226                 else:
227                     # create new directory entry
228                     ent = new_entry(i)
229                     if ent is not None:
230                         self._entries[name] = self.inodes.add_entry(ent)
231                         changed = True
232
233         # delete any other directory entries that were not in found in 'items'
234         for i in oldentries:
235             llfuse.invalidate_entry(self.inode, str(i))
236             self.inodes.del_entry(oldentries[i])
237             changed = True
238
239         if changed:
240             self._mtime = time.time()
241
242         self.fresh()
243
244     def clear(self):
245         '''Delete all entries'''
246         oldentries = self._entries
247         self._entries = {}
248         for n in oldentries:
249             if isinstance(n, Directory):
250                 n.clear()
251             llfuse.invalidate_entry(self.inode, str(n))
252             self.inodes.del_entry(oldentries[n])
253         llfuse.invalidate_inode(self.inode)
254         self.invalidate()
255
256     def mtime(self):
257         return self._mtime
258
259
260 class CollectionDirectory(Directory):
261     '''Represents the root of a directory tree holding a collection.'''
262
263     def __init__(self, parent_inode, inodes, api, num_retries, collection):
264         super(CollectionDirectory, self).__init__(parent_inode)
265         self.inodes = inodes
266         self.api = api
267         self.num_retries = num_retries
268         self.collection_object_file = None
269         self.collection_object = None
270         if isinstance(collection, dict):
271             self.collection_locator = collection['uuid']
272             self._mtime = convertTime(collection.get('modified_at'))
273         else:
274             self.collection_locator = collection
275             self._mtime = 0
276
277     def same(self, i):
278         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
279
280     # Used by arv-web.py to switch the contents of the CollectionDirectory
281     def change_collection(self, new_locator):
282         """Switch the contents of the CollectionDirectory.  Must be called with llfuse.lock held."""
283         self.collection_locator = new_locator
284         self.collection_object = None
285         self.update()
286
287     def new_collection(self, new_collection_object, coll_reader):
288         self.collection_object = new_collection_object
289
290         self._mtime = convertTime(self.collection_object.get('modified_at'))
291
292         if self.collection_object_file is not None:
293             self.collection_object_file.update(self.collection_object)
294
295         self.clear()
296         for s in coll_reader.all_streams():
297             cwd = self
298             for part in s.name().split('/'):
299                 if part != '' and part != '.':
300                     partname = sanitize_filename(part)
301                     if partname not in cwd._entries:
302                         cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
303                     cwd = cwd._entries[partname]
304             for k, v in s.files().items():
305                 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
306
307     def update(self):
308         try:
309             if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
310                 return True
311
312             if self.collection_locator is None:
313                 self.fresh()
314                 return True
315
316             with llfuse.lock_released:
317                 coll_reader = arvados.CollectionReader(
318                     self.collection_locator, self.api, self.api.keep,
319                     num_retries=self.num_retries)
320                 new_collection_object = coll_reader.api_response() or {}
321                 # If the Collection only exists in Keep, there will be no API
322                 # response.  Fill in the fields we need.
323                 if 'uuid' not in new_collection_object:
324                     new_collection_object['uuid'] = self.collection_locator
325                 if "portable_data_hash" not in new_collection_object:
326                     new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
327                 if 'manifest_text' not in new_collection_object:
328                     new_collection_object['manifest_text'] = coll_reader.manifest_text()
329                 coll_reader.normalize()
330             # end with llfuse.lock_released, re-acquire lock
331
332             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
333                 self.new_collection(new_collection_object, coll_reader)
334
335             self.fresh()
336             return True
337         except arvados.errors.NotFoundError:
338             _logger.exception("arv-mount %s: error", self.collection_locator)
339         except arvados.errors.ArgumentError as detail:
340             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
341             if self.collection_object is not None and "manifest_text" in self.collection_object:
342                 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
343         except Exception:
344             _logger.exception("arv-mount %s: error", self.collection_locator)
345             if self.collection_object is not None and "manifest_text" in self.collection_object:
346                 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
347         return False
348
349     def __getitem__(self, item):
350         self.checkupdate()
351         if item == '.arvados#collection':
352             if self.collection_object_file is None:
353                 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
354                 self.inodes.add_entry(self.collection_object_file)
355             return self.collection_object_file
356         else:
357             return super(CollectionDirectory, self).__getitem__(item)
358
359     def __contains__(self, k):
360         if k == '.arvados#collection':
361             return True
362         else:
363             return super(CollectionDirectory, self).__contains__(k)
364
365
366 class MagicDirectory(Directory):
367     '''A special directory that logically contains the set of all extant keep
368     locators.  When a file is referenced by lookup(), it is tested to see if it
369     is a valid keep locator to a manifest, and if so, loads the manifest
370     contents as a subdirectory of this directory with the locator as the
371     directory name.  Since querying a list of all extant keep locators is
372     impractical, only collections that have already been accessed are visible
373     to readdir().
374     '''
375
376     README_TEXT = '''
377 This directory provides access to Arvados collections as subdirectories listed
378 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
379 the form '1234567890abcdefghijklmnopqrstuv+123').
380
381 Note that this directory will appear empty until you attempt to access a
382 specific collection subdirectory (such as trying to 'cd' into it), at which
383 point the collection will actually be looked up on the server and the directory
384 will appear if it exists.
385 '''.lstrip()
386
387     def __init__(self, parent_inode, inodes, api, num_retries):
388         super(MagicDirectory, self).__init__(parent_inode)
389         self.inodes = inodes
390         self.api = api
391         self.num_retries = num_retries
392
393     def __setattr__(self, name, value):
394         super(MagicDirectory, self).__setattr__(name, value)
395         # When we're assigned an inode, add a README.
396         if ((name == 'inode') and (self.inode is not None) and
397               (not self._entries)):
398             self._entries['README'] = self.inodes.add_entry(
399                 StringFile(self.inode, self.README_TEXT, time.time()))
400             # If we're the root directory, add an identical by_id subdirectory.
401             if self.inode == llfuse.ROOT_INODE:
402                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
403                         self.inode, self.inodes, self.api, self.num_retries))
404
405     def __contains__(self, k):
406         if k in self._entries:
407             return True
408
409         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
410             return False
411
412         try:
413             e = self.inodes.add_entry(CollectionDirectory(
414                     self.inode, self.inodes, self.api, self.num_retries, k))
415             if e.update():
416                 self._entries[k] = e
417                 return True
418             else:
419                 return False
420         except Exception as e:
421             _logger.debug('arv-mount exception keep %s', e)
422             return False
423
424     def __getitem__(self, item):
425         if item in self:
426             return self._entries[item]
427         else:
428             raise KeyError("No collection with id " + item)
429
430
431 class RecursiveInvalidateDirectory(Directory):
432     def invalidate(self):
433         if self.inode == llfuse.ROOT_INODE:
434             llfuse.lock.acquire()
435         try:
436             super(RecursiveInvalidateDirectory, self).invalidate()
437             for a in self._entries:
438                 self._entries[a].invalidate()
439         except Exception:
440             _logger.exception()
441         finally:
442             if self.inode == llfuse.ROOT_INODE:
443                 llfuse.lock.release()
444
445
446 class TagsDirectory(RecursiveInvalidateDirectory):
447     '''A special directory that contains as subdirectories all tags visible to the user.'''
448
449     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
450         super(TagsDirectory, self).__init__(parent_inode)
451         self.inodes = inodes
452         self.api = api
453         self.num_retries = num_retries
454         self._poll = True
455         self._poll_time = poll_time
456
457     def update(self):
458         with llfuse.lock_released:
459             tags = self.api.links().list(
460                 filters=[['link_class', '=', 'tag']],
461                 select=['name'], distinct=True
462                 ).execute(num_retries=self.num_retries)
463         if "items" in tags:
464             self.merge(tags['items'],
465                        lambda i: i['name'],
466                        lambda a, i: a.tag == i['name'],
467                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
468
469
470 class TagDirectory(Directory):
471     '''A special directory that contains as subdirectories all collections visible
472     to the user that are tagged with a particular tag.
473     '''
474
475     def __init__(self, parent_inode, inodes, api, num_retries, tag,
476                  poll=False, poll_time=60):
477         super(TagDirectory, self).__init__(parent_inode)
478         self.inodes = inodes
479         self.api = api
480         self.num_retries = num_retries
481         self.tag = tag
482         self._poll = poll
483         self._poll_time = poll_time
484
485     def update(self):
486         with llfuse.lock_released:
487             taggedcollections = self.api.links().list(
488                 filters=[['link_class', '=', 'tag'],
489                          ['name', '=', self.tag],
490                          ['head_uuid', 'is_a', 'arvados#collection']],
491                 select=['head_uuid']
492                 ).execute(num_retries=self.num_retries)
493         self.merge(taggedcollections['items'],
494                    lambda i: i['head_uuid'],
495                    lambda a, i: a.collection_locator == i['head_uuid'],
496                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
497
498
499 class ProjectDirectory(Directory):
500     '''A special directory that contains the contents of a project.'''
501
502     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
503                  poll=False, poll_time=60):
504         super(ProjectDirectory, self).__init__(parent_inode)
505         self.inodes = inodes
506         self.api = api
507         self.num_retries = num_retries
508         self.project_object = project_object
509         self.project_object_file = None
510         self.uuid = project_object['uuid']
511         self._poll = poll
512         self._poll_time = poll_time
513
514     def createDirectory(self, i):
515         if collection_uuid_pattern.match(i['uuid']):
516             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
517         elif group_uuid_pattern.match(i['uuid']):
518             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
519         elif link_uuid_pattern.match(i['uuid']):
520             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
521                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
522             else:
523                 return None
524         elif uuid_pattern.match(i['uuid']):
525             return ObjectFile(self.parent_inode, i)
526         else:
527             return None
528
529     def update(self):
530         if self.project_object_file == None:
531             self.project_object_file = ObjectFile(self.inode, self.project_object)
532             self.inodes.add_entry(self.project_object_file)
533
534         def namefn(i):
535             if 'name' in i:
536                 if i['name'] is None or len(i['name']) == 0:
537                     return None
538                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
539                     # collection or subproject
540                     return i['name']
541                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
542                     # name link
543                     return i['name']
544                 elif 'kind' in i and i['kind'].startswith('arvados#'):
545                     # something else
546                     return "{}.{}".format(i['name'], i['kind'][8:])
547             else:
548                 return None
549
550         def samefn(a, i):
551             if isinstance(a, CollectionDirectory):
552                 return a.collection_locator == i['uuid']
553             elif isinstance(a, ProjectDirectory):
554                 return a.uuid == i['uuid']
555             elif isinstance(a, ObjectFile):
556                 return a.uuid == i['uuid'] and not a.stale()
557             return False
558
559         with llfuse.lock_released:
560             if group_uuid_pattern.match(self.uuid):
561                 self.project_object = self.api.groups().get(
562                     uuid=self.uuid).execute(num_retries=self.num_retries)
563             elif user_uuid_pattern.match(self.uuid):
564                 self.project_object = self.api.users().get(
565                     uuid=self.uuid).execute(num_retries=self.num_retries)
566
567             contents = arvados.util.list_all(self.api.groups().contents,
568                                              self.num_retries, uuid=self.uuid)
569             # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
570             contents += arvados.util.list_all(
571                 self.api.links().list, self.num_retries,
572                 filters=[['tail_uuid', '=', self.uuid],
573                          ['link_class', '=', 'name']])
574
575         # end with llfuse.lock_released, re-acquire lock
576
577         self.merge(contents,
578                    namefn,
579                    samefn,
580                    self.createDirectory)
581
582     def __getitem__(self, item):
583         self.checkupdate()
584         if item == '.arvados#project':
585             return self.project_object_file
586         else:
587             return super(ProjectDirectory, self).__getitem__(item)
588
589     def __contains__(self, k):
590         if k == '.arvados#project':
591             return True
592         else:
593             return super(ProjectDirectory, self).__contains__(k)
594
595
596 class SharedDirectory(Directory):
597     '''A special directory that represents users or groups who have shared projects with me.'''
598
599     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
600                  poll=False, poll_time=60):
601         super(SharedDirectory, self).__init__(parent_inode)
602         self.inodes = inodes
603         self.api = api
604         self.num_retries = num_retries
605         self.current_user = api.users().current().execute(num_retries=num_retries)
606         self._poll = True
607         self._poll_time = poll_time
608
609     def update(self):
610         with llfuse.lock_released:
611             all_projects = arvados.util.list_all(
612                 self.api.groups().list, self.num_retries,
613                 filters=[['group_class','=','project']])
614             objects = {}
615             for ob in all_projects:
616                 objects[ob['uuid']] = ob
617
618             roots = []
619             root_owners = {}
620             for ob in all_projects:
621                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
622                     roots.append(ob)
623                     root_owners[ob['owner_uuid']] = True
624
625             lusers = arvados.util.list_all(
626                 self.api.users().list, self.num_retries,
627                 filters=[['uuid','in', list(root_owners)]])
628             lgroups = arvados.util.list_all(
629                 self.api.groups().list, self.num_retries,
630                 filters=[['uuid','in', list(root_owners)]])
631
632             users = {}
633             groups = {}
634
635             for l in lusers:
636                 objects[l["uuid"]] = l
637             for l in lgroups:
638                 objects[l["uuid"]] = l
639
640             contents = {}
641             for r in root_owners:
642                 if r in objects:
643                     obr = objects[r]
644                     if "name" in obr:
645                         contents[obr["name"]] = obr
646                     if "first_name" in obr:
647                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
648
649             for r in roots:
650                 if r['owner_uuid'] not in objects:
651                     contents[r['name']] = r
652
653         # end with llfuse.lock_released, re-acquire lock
654
655         try:
656             self.merge(contents.items(),
657                        lambda i: i[0],
658                        lambda a, i: a.uuid == i[1]['uuid'],
659                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
660         except Exception:
661             _logger.exception()
662
663
664 class FileHandle(object):
665     '''Connects a numeric file handle to a File or Directory object that has
666     been opened by the client.'''
667
668     def __init__(self, fh, entry):
669         self.fh = fh
670         self.entry = entry
671
672
673 class Inodes(object):
674     '''Manage the set of inodes.  This is the mapping from a numeric id
675     to a concrete File or Directory object'''
676
677     def __init__(self):
678         self._entries = {}
679         self._counter = itertools.count(llfuse.ROOT_INODE)
680
681     def __getitem__(self, item):
682         return self._entries[item]
683
684     def __setitem__(self, key, item):
685         self._entries[key] = item
686
687     def __iter__(self):
688         return self._entries.iterkeys()
689
690     def items(self):
691         return self._entries.items()
692
693     def __contains__(self, k):
694         return k in self._entries
695
696     def add_entry(self, entry):
697         entry.inode = next(self._counter)
698         self._entries[entry.inode] = entry
699         return entry
700
701     def del_entry(self, entry):
702         llfuse.invalidate_inode(entry.inode)
703         del self._entries[entry.inode]
704
705 class Operations(llfuse.Operations):
706     '''This is the main interface with llfuse.  The methods on this object are
707     called by llfuse threads to service FUSE events to query and read from
708     the file system.
709
710     llfuse has its own global lock which is acquired before calling a request handler,
711     so request handlers do not run concurrently unless the lock is explicitly released
712     using "with llfuse.lock_released:"'''
713
714     def __init__(self, uid, gid, encoding="utf-8"):
715         super(Operations, self).__init__()
716
717         self.inodes = Inodes()
718         self.uid = uid
719         self.gid = gid
720         self.encoding = encoding
721
722         # dict of inode to filehandle
723         self._filehandles = {}
724         self._filehandles_counter = 1
725
726         # Other threads that need to wait until the fuse driver
727         # is fully initialized should wait() on this event object.
728         self.initlock = threading.Event()
729
730     def init(self):
731         # Allow threads that are waiting for the driver to be finished
732         # initializing to continue
733         self.initlock.set()
734
735     def access(self, inode, mode, ctx):
736         return True
737
738     def getattr(self, inode):
739         if inode not in self.inodes:
740             raise llfuse.FUSEError(errno.ENOENT)
741
742         e = self.inodes[inode]
743
744         entry = llfuse.EntryAttributes()
745         entry.st_ino = inode
746         entry.generation = 0
747         entry.entry_timeout = 300
748         entry.attr_timeout = 300
749
750         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
751         if isinstance(e, Directory):
752             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
753         elif isinstance(e, StreamReaderFile):
754             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
755         else:
756             entry.st_mode |= stat.S_IFREG
757
758         entry.st_nlink = 1
759         entry.st_uid = self.uid
760         entry.st_gid = self.gid
761         entry.st_rdev = 0
762
763         entry.st_size = e.size()
764
765         entry.st_blksize = 512
766         entry.st_blocks = (e.size()/512)+1
767         entry.st_atime = int(e.atime())
768         entry.st_mtime = int(e.mtime())
769         entry.st_ctime = int(e.mtime())
770
771         return entry
772
773     def lookup(self, parent_inode, name):
774         name = unicode(name, self.encoding)
775         _logger.debug("arv-mount lookup: parent_inode %i name %s",
776                       parent_inode, name)
777         inode = None
778
779         if name == '.':
780             inode = parent_inode
781         else:
782             if parent_inode in self.inodes:
783                 p = self.inodes[parent_inode]
784                 if name == '..':
785                     inode = p.parent_inode
786                 elif isinstance(p, Directory) and name in p:
787                     inode = p[name].inode
788
789         if inode != None:
790             return self.getattr(inode)
791         else:
792             raise llfuse.FUSEError(errno.ENOENT)
793
794     def open(self, inode, flags):
795         if inode in self.inodes:
796             p = self.inodes[inode]
797         else:
798             raise llfuse.FUSEError(errno.ENOENT)
799
800         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
801             raise llfuse.FUSEError(errno.EROFS)
802
803         if isinstance(p, Directory):
804             raise llfuse.FUSEError(errno.EISDIR)
805
806         fh = self._filehandles_counter
807         self._filehandles_counter += 1
808         self._filehandles[fh] = FileHandle(fh, p)
809         return fh
810
811     def read(self, fh, off, size):
812         _logger.debug("arv-mount read %i %i %i", fh, off, size)
813         if fh in self._filehandles:
814             handle = self._filehandles[fh]
815         else:
816             raise llfuse.FUSEError(errno.EBADF)
817
818         # update atime
819         handle.entry._atime = time.time()
820
821         try:
822             with llfuse.lock_released:
823                 return handle.entry.readfrom(off, size)
824         except arvados.errors.NotFoundError as e:
825             _logger.warning("Block not found: " + str(e))
826             raise llfuse.FUSEError(errno.EIO)
827         except Exception:
828             _logger.exception()
829             raise llfuse.FUSEError(errno.EIO)
830
831     def release(self, fh):
832         if fh in self._filehandles:
833             del self._filehandles[fh]
834
835     def opendir(self, inode):
836         _logger.debug("arv-mount opendir: inode %i", inode)
837
838         if inode in self.inodes:
839             p = self.inodes[inode]
840         else:
841             raise llfuse.FUSEError(errno.ENOENT)
842
843         if not isinstance(p, Directory):
844             raise llfuse.FUSEError(errno.ENOTDIR)
845
846         fh = self._filehandles_counter
847         self._filehandles_counter += 1
848         if p.parent_inode in self.inodes:
849             parent = self.inodes[p.parent_inode]
850         else:
851             raise llfuse.FUSEError(errno.EIO)
852
853         # update atime
854         p._atime = time.time()
855
856         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
857         return fh
858
859     def readdir(self, fh, off):
860         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
861
862         if fh in self._filehandles:
863             handle = self._filehandles[fh]
864         else:
865             raise llfuse.FUSEError(errno.EBADF)
866
867         _logger.debug("arv-mount handle.entry %s", handle.entry)
868
869         e = off
870         while e < len(handle.entry):
871             if handle.entry[e][1].inode in self.inodes:
872                 try:
873                     yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
874                 except UnicodeEncodeError:
875                     pass
876             e += 1
877
878     def releasedir(self, fh):
879         del self._filehandles[fh]
880
881     def statfs(self):
882         st = llfuse.StatvfsData()
883         st.f_bsize = 64 * 1024
884         st.f_blocks = 0
885         st.f_files = 0
886
887         st.f_bfree = 0
888         st.f_bavail = 0
889
890         st.f_ffree = 0
891         st.f_favail = 0
892
893         st.f_frsize = 0
894         return st
895
896     # The llfuse documentation recommends only overloading functions that
897     # are actually implemented, as the default implementation will raise ENOSYS.
898     # However, there is a bug in the llfuse default implementation of create()
899     # "create() takes exactly 5 positional arguments (6 given)" which will crash
900     # arv-mount.
901     # The workaround is to implement it with the proper number of parameters,
902     # and then everything works out.
903     def create(self, inode_parent, name, mode, flags, ctx):
904         raise llfuse.FUSEError(errno.EROFS)