Merge branch '5043-crunchstat-long-lines' closes #5043
[arvados.git] / services / fuse / arvados_fuse / __init__.py
1 #
2 # FUSE driver for Arvados Keep
3 #
4
5 import os
6 import sys
7 import llfuse
8 import errno
9 import stat
10 import threading
11 import arvados
12 import pprint
13 import arvados.events
14 import re
15 import apiclient
16 import json
17 import logging
18 import time
19 import _strptime
20 import calendar
21 import threading
22 import itertools
23 import ciso8601
24
25 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
26
27 _logger = logging.getLogger('arvados.arvados_fuse')
28
29 # Match any character which FUSE or Linux cannot accommodate as part
30 # of a filename. (If present in a collection filename, they will
31 # appear as underscores in the fuse mount.)
32 _disallowed_filename_characters = re.compile('[\x00/]')
33
34 def convertTime(t):
35     """Parse Arvados timestamp to unix time."""
36     if not t:
37         return 0
38     try:
39         return calendar.timegm(ciso8601.parse_datetime_unaware(t).timetuple())
40     except (TypeError, ValueError):
41         return 0
42
43 def sanitize_filename(dirty):
44     '''Replace disallowed filename characters with harmless "_".'''
45     if dirty is None:
46         return None
47     elif dirty == '':
48         return '_'
49     elif dirty == '.':
50         return '_'
51     elif dirty == '..':
52         return '__'
53     else:
54         return _disallowed_filename_characters.sub('_', dirty)
55
56
57 class FreshBase(object):
58     '''Base class for maintaining fresh/stale state to determine when to update.'''
59     def __init__(self):
60         self._stale = True
61         self._poll = False
62         self._last_update = time.time()
63         self._atime = time.time()
64         self._poll_time = 60
65
66     # Mark the value as stale
67     def invalidate(self):
68         self._stale = True
69
70     # Test if the entries dict is stale.
71     def stale(self):
72         if self._stale:
73             return True
74         if self._poll:
75             return (self._last_update + self._poll_time) < self._atime
76         return False
77
78     def fresh(self):
79         self._stale = False
80         self._last_update = time.time()
81
82     def atime(self):
83         return self._atime
84
85 class File(FreshBase):
86     '''Base for file objects.'''
87
88     def __init__(self, parent_inode, _mtime=0):
89         super(File, self).__init__()
90         self.inode = None
91         self.parent_inode = parent_inode
92         self._mtime = _mtime
93
94     def size(self):
95         return 0
96
97     def readfrom(self, off, size):
98         return ''
99
100     def mtime(self):
101         return self._mtime
102
103
104 class StreamReaderFile(File):
105     '''Wraps a StreamFileReader as a file.'''
106
107     def __init__(self, parent_inode, reader, _mtime):
108         super(StreamReaderFile, self).__init__(parent_inode, _mtime)
109         self.reader = reader
110
111     def size(self):
112         return self.reader.size()
113
114     def readfrom(self, off, size):
115         return self.reader.readfrom(off, size)
116
117     def stale(self):
118         return False
119
120
121 class StringFile(File):
122     '''Wrap a simple string as a file'''
123     def __init__(self, parent_inode, contents, _mtime):
124         super(StringFile, self).__init__(parent_inode, _mtime)
125         self.contents = contents
126
127     def size(self):
128         return len(self.contents)
129
130     def readfrom(self, off, size):
131         return self.contents[off:(off+size)]
132
133
134 class ObjectFile(StringFile):
135     '''Wrap a dict as a serialized json object.'''
136
137     def __init__(self, parent_inode, obj):
138         super(ObjectFile, self).__init__(parent_inode, "", 0)
139         self.uuid = obj['uuid']
140         self.update(obj)
141
142     def update(self, obj):
143         self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
144         self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
145
146
147 class Directory(FreshBase):
148     '''Generic directory object, backed by a dict.
149     Consists of a set of entries with the key representing the filename
150     and the value referencing a File or Directory object.
151     '''
152
153     def __init__(self, parent_inode):
154         super(Directory, self).__init__()
155
156         '''parent_inode is the integer inode number'''
157         self.inode = None
158         if not isinstance(parent_inode, int):
159             raise Exception("parent_inode should be an int")
160         self.parent_inode = parent_inode
161         self._entries = {}
162         self._mtime = time.time()
163
164     #  Overriden by subclasses to implement logic to update the entries dict
165     #  when the directory is stale
166     def update(self):
167         pass
168
169     # Only used when computing the size of the disk footprint of the directory
170     # (stub)
171     def size(self):
172         return 0
173
174     def checkupdate(self):
175         if self.stale():
176             try:
177                 self.update()
178             except apiclient.errors.HttpError as e:
179                 _logger.debug(e)
180
181     def __getitem__(self, item):
182         self.checkupdate()
183         return self._entries[item]
184
185     def items(self):
186         self.checkupdate()
187         return self._entries.items()
188
189     def __iter__(self):
190         self.checkupdate()
191         return self._entries.iterkeys()
192
193     def __contains__(self, k):
194         self.checkupdate()
195         return k in self._entries
196
197     def merge(self, items, fn, same, new_entry):
198         '''Helper method for updating the contents of the directory.  Takes a list
199         describing the new contents of the directory, reuse entries that are
200         the same in both the old and new lists, create new entries, and delete
201         old entries missing from the new list.
202
203         items: iterable with new directory contents
204
205         fn: function to take an entry in 'items' and return the desired file or
206         directory name, or None if this entry should be skipped
207
208         same: function to compare an existing entry (a File or Directory
209         object) with an entry in the items list to determine whether to keep
210         the existing entry.
211
212         new_entry: function to create a new directory entry (File or Directory
213         object) from an entry in the items list.
214
215         '''
216
217         oldentries = self._entries
218         self._entries = {}
219         changed = False
220         for i in items:
221             name = sanitize_filename(fn(i))
222             if name:
223                 if name in oldentries and same(oldentries[name], i):
224                     # move existing directory entry over
225                     self._entries[name] = oldentries[name]
226                     del oldentries[name]
227                 else:
228                     # create new directory entry
229                     ent = new_entry(i)
230                     if ent is not None:
231                         self._entries[name] = self.inodes.add_entry(ent)
232                         changed = True
233
234         # delete any other directory entries that were not in found in 'items'
235         for i in oldentries:
236             llfuse.invalidate_entry(self.inode, str(i))
237             self.inodes.del_entry(oldentries[i])
238             changed = True
239
240         if changed:
241             self._mtime = time.time()
242
243         self.fresh()
244
245     def clear(self):
246         '''Delete all entries'''
247         oldentries = self._entries
248         self._entries = {}
249         for n in oldentries:
250             if isinstance(n, Directory):
251                 n.clear()
252             llfuse.invalidate_entry(self.inode, str(n))
253             self.inodes.del_entry(oldentries[n])
254         llfuse.invalidate_inode(self.inode)
255         self.invalidate()
256
257     def mtime(self):
258         return self._mtime
259
260
261 class CollectionDirectory(Directory):
262     '''Represents the root of a directory tree holding a collection.'''
263
264     def __init__(self, parent_inode, inodes, api, num_retries, collection):
265         super(CollectionDirectory, self).__init__(parent_inode)
266         self.inodes = inodes
267         self.api = api
268         self.num_retries = num_retries
269         self.collection_object_file = None
270         self.collection_object = None
271         if isinstance(collection, dict):
272             self.collection_locator = collection['uuid']
273             self._mtime = convertTime(collection.get('modified_at'))
274         else:
275             self.collection_locator = collection
276             self._mtime = 0
277
278     def same(self, i):
279         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
280
281     # Used by arv-web.py to switch the contents of the CollectionDirectory
282     def change_collection(self, new_locator):
283         """Switch the contents of the CollectionDirectory.  Must be called with llfuse.lock held."""
284         self.collection_locator = new_locator
285         self.collection_object = None
286         self.update()
287
288     def new_collection(self, new_collection_object, coll_reader):
289         self.collection_object = new_collection_object
290
291         self._mtime = convertTime(self.collection_object.get('modified_at'))
292
293         if self.collection_object_file is not None:
294             self.collection_object_file.update(self.collection_object)
295
296         self.clear()
297         for s in coll_reader.all_streams():
298             cwd = self
299             for part in s.name().split('/'):
300                 if part != '' and part != '.':
301                     partname = sanitize_filename(part)
302                     if partname not in cwd._entries:
303                         cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
304                     cwd = cwd._entries[partname]
305             for k, v in s.files().items():
306                 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
307
308     def update(self):
309         try:
310             if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
311                 return True
312
313             if self.collection_locator is None:
314                 self.fresh()
315                 return True
316
317             with llfuse.lock_released:
318                 coll_reader = arvados.CollectionReader(
319                     self.collection_locator, self.api, self.api.keep,
320                     num_retries=self.num_retries)
321                 new_collection_object = coll_reader.api_response() or {}
322                 # If the Collection only exists in Keep, there will be no API
323                 # response.  Fill in the fields we need.
324                 if 'uuid' not in new_collection_object:
325                     new_collection_object['uuid'] = self.collection_locator
326                 if "portable_data_hash" not in new_collection_object:
327                     new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
328                 if 'manifest_text' not in new_collection_object:
329                     new_collection_object['manifest_text'] = coll_reader.manifest_text()
330                 coll_reader.normalize()
331             # end with llfuse.lock_released, re-acquire lock
332
333             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
334                 self.new_collection(new_collection_object, coll_reader)
335
336             self.fresh()
337             return True
338         except arvados.errors.NotFoundError:
339             _logger.exception("arv-mount %s: error", self.collection_locator)
340         except arvados.errors.ArgumentError as detail:
341             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
342             if self.collection_object is not None and "manifest_text" in self.collection_object:
343                 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
344         except Exception:
345             _logger.exception("arv-mount %s: error", self.collection_locator)
346             if self.collection_object is not None and "manifest_text" in self.collection_object:
347                 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
348         return False
349
350     def __getitem__(self, item):
351         self.checkupdate()
352         if item == '.arvados#collection':
353             if self.collection_object_file is None:
354                 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
355                 self.inodes.add_entry(self.collection_object_file)
356             return self.collection_object_file
357         else:
358             return super(CollectionDirectory, self).__getitem__(item)
359
360     def __contains__(self, k):
361         if k == '.arvados#collection':
362             return True
363         else:
364             return super(CollectionDirectory, self).__contains__(k)
365
366
367 class MagicDirectory(Directory):
368     '''A special directory that logically contains the set of all extant keep
369     locators.  When a file is referenced by lookup(), it is tested to see if it
370     is a valid keep locator to a manifest, and if so, loads the manifest
371     contents as a subdirectory of this directory with the locator as the
372     directory name.  Since querying a list of all extant keep locators is
373     impractical, only collections that have already been accessed are visible
374     to readdir().
375     '''
376
377     README_TEXT = '''
378 This directory provides access to Arvados collections as subdirectories listed
379 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
380 the form '1234567890abcdefghijklmnopqrstuv+123').
381
382 Note that this directory will appear empty until you attempt to access a
383 specific collection subdirectory (such as trying to 'cd' into it), at which
384 point the collection will actually be looked up on the server and the directory
385 will appear if it exists.
386 '''.lstrip()
387
388     def __init__(self, parent_inode, inodes, api, num_retries):
389         super(MagicDirectory, self).__init__(parent_inode)
390         self.inodes = inodes
391         self.api = api
392         self.num_retries = num_retries
393
394     def __setattr__(self, name, value):
395         super(MagicDirectory, self).__setattr__(name, value)
396         # When we're assigned an inode, add a README.
397         if ((name == 'inode') and (self.inode is not None) and
398               (not self._entries)):
399             self._entries['README'] = self.inodes.add_entry(
400                 StringFile(self.inode, self.README_TEXT, time.time()))
401             # If we're the root directory, add an identical by_id subdirectory.
402             if self.inode == llfuse.ROOT_INODE:
403                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
404                         self.inode, self.inodes, self.api, self.num_retries))
405
406     def __contains__(self, k):
407         if k in self._entries:
408             return True
409
410         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
411             return False
412
413         try:
414             e = self.inodes.add_entry(CollectionDirectory(
415                     self.inode, self.inodes, self.api, self.num_retries, k))
416             if e.update():
417                 self._entries[k] = e
418                 return True
419             else:
420                 return False
421         except Exception as e:
422             _logger.debug('arv-mount exception keep %s', e)
423             return False
424
425     def __getitem__(self, item):
426         if item in self:
427             return self._entries[item]
428         else:
429             raise KeyError("No collection with id " + item)
430
431
432 class RecursiveInvalidateDirectory(Directory):
433     def invalidate(self):
434         if self.inode == llfuse.ROOT_INODE:
435             llfuse.lock.acquire()
436         try:
437             super(RecursiveInvalidateDirectory, self).invalidate()
438             for a in self._entries:
439                 self._entries[a].invalidate()
440         except Exception:
441             _logger.exception()
442         finally:
443             if self.inode == llfuse.ROOT_INODE:
444                 llfuse.lock.release()
445
446
447 class TagsDirectory(RecursiveInvalidateDirectory):
448     '''A special directory that contains as subdirectories all tags visible to the user.'''
449
450     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
451         super(TagsDirectory, self).__init__(parent_inode)
452         self.inodes = inodes
453         self.api = api
454         self.num_retries = num_retries
455         self._poll = True
456         self._poll_time = poll_time
457
458     def update(self):
459         with llfuse.lock_released:
460             tags = self.api.links().list(
461                 filters=[['link_class', '=', 'tag']],
462                 select=['name'], distinct=True
463                 ).execute(num_retries=self.num_retries)
464         if "items" in tags:
465             self.merge(tags['items'],
466                        lambda i: i['name'],
467                        lambda a, i: a.tag == i['name'],
468                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
469
470
471 class TagDirectory(Directory):
472     '''A special directory that contains as subdirectories all collections visible
473     to the user that are tagged with a particular tag.
474     '''
475
476     def __init__(self, parent_inode, inodes, api, num_retries, tag,
477                  poll=False, poll_time=60):
478         super(TagDirectory, self).__init__(parent_inode)
479         self.inodes = inodes
480         self.api = api
481         self.num_retries = num_retries
482         self.tag = tag
483         self._poll = poll
484         self._poll_time = poll_time
485
486     def update(self):
487         with llfuse.lock_released:
488             taggedcollections = self.api.links().list(
489                 filters=[['link_class', '=', 'tag'],
490                          ['name', '=', self.tag],
491                          ['head_uuid', 'is_a', 'arvados#collection']],
492                 select=['head_uuid']
493                 ).execute(num_retries=self.num_retries)
494         self.merge(taggedcollections['items'],
495                    lambda i: i['head_uuid'],
496                    lambda a, i: a.collection_locator == i['head_uuid'],
497                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
498
499
500 class ProjectDirectory(Directory):
501     '''A special directory that contains the contents of a project.'''
502
503     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
504                  poll=False, poll_time=60):
505         super(ProjectDirectory, self).__init__(parent_inode)
506         self.inodes = inodes
507         self.api = api
508         self.num_retries = num_retries
509         self.project_object = project_object
510         self.project_object_file = None
511         self.uuid = project_object['uuid']
512         self._poll = poll
513         self._poll_time = poll_time
514
515     def createDirectory(self, i):
516         if collection_uuid_pattern.match(i['uuid']):
517             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
518         elif group_uuid_pattern.match(i['uuid']):
519             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
520         elif link_uuid_pattern.match(i['uuid']):
521             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
522                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
523             else:
524                 return None
525         elif uuid_pattern.match(i['uuid']):
526             return ObjectFile(self.parent_inode, i)
527         else:
528             return None
529
530     def update(self):
531         if self.project_object_file == None:
532             self.project_object_file = ObjectFile(self.inode, self.project_object)
533             self.inodes.add_entry(self.project_object_file)
534
535         def namefn(i):
536             if 'name' in i:
537                 if i['name'] is None or len(i['name']) == 0:
538                     return None
539                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
540                     # collection or subproject
541                     return i['name']
542                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
543                     # name link
544                     return i['name']
545                 elif 'kind' in i and i['kind'].startswith('arvados#'):
546                     # something else
547                     return "{}.{}".format(i['name'], i['kind'][8:])
548             else:
549                 return None
550
551         def samefn(a, i):
552             if isinstance(a, CollectionDirectory):
553                 return a.collection_locator == i['uuid']
554             elif isinstance(a, ProjectDirectory):
555                 return a.uuid == i['uuid']
556             elif isinstance(a, ObjectFile):
557                 return a.uuid == i['uuid'] and not a.stale()
558             return False
559
560         with llfuse.lock_released:
561             if group_uuid_pattern.match(self.uuid):
562                 self.project_object = self.api.groups().get(
563                     uuid=self.uuid).execute(num_retries=self.num_retries)
564             elif user_uuid_pattern.match(self.uuid):
565                 self.project_object = self.api.users().get(
566                     uuid=self.uuid).execute(num_retries=self.num_retries)
567
568             contents = arvados.util.list_all(self.api.groups().contents,
569                                              self.num_retries, uuid=self.uuid)
570             # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
571             contents += arvados.util.list_all(
572                 self.api.links().list, self.num_retries,
573                 filters=[['tail_uuid', '=', self.uuid],
574                          ['link_class', '=', 'name']])
575
576         # end with llfuse.lock_released, re-acquire lock
577
578         self.merge(contents,
579                    namefn,
580                    samefn,
581                    self.createDirectory)
582
583     def __getitem__(self, item):
584         self.checkupdate()
585         if item == '.arvados#project':
586             return self.project_object_file
587         else:
588             return super(ProjectDirectory, self).__getitem__(item)
589
590     def __contains__(self, k):
591         if k == '.arvados#project':
592             return True
593         else:
594             return super(ProjectDirectory, self).__contains__(k)
595
596
597 class SharedDirectory(Directory):
598     '''A special directory that represents users or groups who have shared projects with me.'''
599
600     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
601                  poll=False, poll_time=60):
602         super(SharedDirectory, self).__init__(parent_inode)
603         self.inodes = inodes
604         self.api = api
605         self.num_retries = num_retries
606         self.current_user = api.users().current().execute(num_retries=num_retries)
607         self._poll = True
608         self._poll_time = poll_time
609
610     def update(self):
611         with llfuse.lock_released:
612             all_projects = arvados.util.list_all(
613                 self.api.groups().list, self.num_retries,
614                 filters=[['group_class','=','project']])
615             objects = {}
616             for ob in all_projects:
617                 objects[ob['uuid']] = ob
618
619             roots = []
620             root_owners = {}
621             for ob in all_projects:
622                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
623                     roots.append(ob)
624                     root_owners[ob['owner_uuid']] = True
625
626             lusers = arvados.util.list_all(
627                 self.api.users().list, self.num_retries,
628                 filters=[['uuid','in', list(root_owners)]])
629             lgroups = arvados.util.list_all(
630                 self.api.groups().list, self.num_retries,
631                 filters=[['uuid','in', list(root_owners)]])
632
633             users = {}
634             groups = {}
635
636             for l in lusers:
637                 objects[l["uuid"]] = l
638             for l in lgroups:
639                 objects[l["uuid"]] = l
640
641             contents = {}
642             for r in root_owners:
643                 if r in objects:
644                     obr = objects[r]
645                     if "name" in obr:
646                         contents[obr["name"]] = obr
647                     if "first_name" in obr:
648                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
649
650             for r in roots:
651                 if r['owner_uuid'] not in objects:
652                     contents[r['name']] = r
653
654         # end with llfuse.lock_released, re-acquire lock
655
656         try:
657             self.merge(contents.items(),
658                        lambda i: i[0],
659                        lambda a, i: a.uuid == i[1]['uuid'],
660                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
661         except Exception:
662             _logger.exception()
663
664
665 class FileHandle(object):
666     '''Connects a numeric file handle to a File or Directory object that has
667     been opened by the client.'''
668
669     def __init__(self, fh, entry):
670         self.fh = fh
671         self.entry = entry
672
673
674 class Inodes(object):
675     '''Manage the set of inodes.  This is the mapping from a numeric id
676     to a concrete File or Directory object'''
677
678     def __init__(self):
679         self._entries = {}
680         self._counter = itertools.count(llfuse.ROOT_INODE)
681
682     def __getitem__(self, item):
683         return self._entries[item]
684
685     def __setitem__(self, key, item):
686         self._entries[key] = item
687
688     def __iter__(self):
689         return self._entries.iterkeys()
690
691     def items(self):
692         return self._entries.items()
693
694     def __contains__(self, k):
695         return k in self._entries
696
697     def add_entry(self, entry):
698         entry.inode = next(self._counter)
699         self._entries[entry.inode] = entry
700         return entry
701
702     def del_entry(self, entry):
703         llfuse.invalidate_inode(entry.inode)
704         del self._entries[entry.inode]
705
706 class Operations(llfuse.Operations):
707     '''This is the main interface with llfuse.  The methods on this object are
708     called by llfuse threads to service FUSE events to query and read from
709     the file system.
710
711     llfuse has its own global lock which is acquired before calling a request handler,
712     so request handlers do not run concurrently unless the lock is explicitly released
713     using "with llfuse.lock_released:"'''
714
715     def __init__(self, uid, gid, encoding="utf-8"):
716         super(Operations, self).__init__()
717
718         self.inodes = Inodes()
719         self.uid = uid
720         self.gid = gid
721         self.encoding = encoding
722
723         # dict of inode to filehandle
724         self._filehandles = {}
725         self._filehandles_counter = 1
726
727         # Other threads that need to wait until the fuse driver
728         # is fully initialized should wait() on this event object.
729         self.initlock = threading.Event()
730
731     def init(self):
732         # Allow threads that are waiting for the driver to be finished
733         # initializing to continue
734         self.initlock.set()
735
736     def access(self, inode, mode, ctx):
737         return True
738
739     def getattr(self, inode):
740         if inode not in self.inodes:
741             raise llfuse.FUSEError(errno.ENOENT)
742
743         e = self.inodes[inode]
744
745         entry = llfuse.EntryAttributes()
746         entry.st_ino = inode
747         entry.generation = 0
748         entry.entry_timeout = 300
749         entry.attr_timeout = 300
750
751         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
752         if isinstance(e, Directory):
753             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
754         elif isinstance(e, StreamReaderFile):
755             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
756         else:
757             entry.st_mode |= stat.S_IFREG
758
759         entry.st_nlink = 1
760         entry.st_uid = self.uid
761         entry.st_gid = self.gid
762         entry.st_rdev = 0
763
764         entry.st_size = e.size()
765
766         entry.st_blksize = 512
767         entry.st_blocks = (e.size()/512)+1
768         entry.st_atime = int(e.atime())
769         entry.st_mtime = int(e.mtime())
770         entry.st_ctime = int(e.mtime())
771
772         return entry
773
774     def lookup(self, parent_inode, name):
775         name = unicode(name, self.encoding)
776         _logger.debug("arv-mount lookup: parent_inode %i name %s",
777                       parent_inode, name)
778         inode = None
779
780         if name == '.':
781             inode = parent_inode
782         else:
783             if parent_inode in self.inodes:
784                 p = self.inodes[parent_inode]
785                 if name == '..':
786                     inode = p.parent_inode
787                 elif isinstance(p, Directory) and name in p:
788                     inode = p[name].inode
789
790         if inode != None:
791             return self.getattr(inode)
792         else:
793             raise llfuse.FUSEError(errno.ENOENT)
794
795     def open(self, inode, flags):
796         if inode in self.inodes:
797             p = self.inodes[inode]
798         else:
799             raise llfuse.FUSEError(errno.ENOENT)
800
801         if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
802             raise llfuse.FUSEError(errno.EROFS)
803
804         if isinstance(p, Directory):
805             raise llfuse.FUSEError(errno.EISDIR)
806
807         fh = self._filehandles_counter
808         self._filehandles_counter += 1
809         self._filehandles[fh] = FileHandle(fh, p)
810         return fh
811
812     def read(self, fh, off, size):
813         _logger.debug("arv-mount read %i %i %i", fh, off, size)
814         if fh in self._filehandles:
815             handle = self._filehandles[fh]
816         else:
817             raise llfuse.FUSEError(errno.EBADF)
818
819         # update atime
820         handle.entry._atime = time.time()
821
822         try:
823             with llfuse.lock_released:
824                 return handle.entry.readfrom(off, size)
825         except arvados.errors.NotFoundError as e:
826             _logger.warning("Block not found: " + str(e))
827             raise llfuse.FUSEError(errno.EIO)
828         except Exception:
829             _logger.exception()
830             raise llfuse.FUSEError(errno.EIO)
831
832     def release(self, fh):
833         if fh in self._filehandles:
834             del self._filehandles[fh]
835
836     def opendir(self, inode):
837         _logger.debug("arv-mount opendir: inode %i", inode)
838
839         if inode in self.inodes:
840             p = self.inodes[inode]
841         else:
842             raise llfuse.FUSEError(errno.ENOENT)
843
844         if not isinstance(p, Directory):
845             raise llfuse.FUSEError(errno.ENOTDIR)
846
847         fh = self._filehandles_counter
848         self._filehandles_counter += 1
849         if p.parent_inode in self.inodes:
850             parent = self.inodes[p.parent_inode]
851         else:
852             raise llfuse.FUSEError(errno.EIO)
853
854         # update atime
855         p._atime = time.time()
856
857         self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
858         return fh
859
860     def readdir(self, fh, off):
861         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
862
863         if fh in self._filehandles:
864             handle = self._filehandles[fh]
865         else:
866             raise llfuse.FUSEError(errno.EBADF)
867
868         _logger.debug("arv-mount handle.entry %s", handle.entry)
869
870         e = off
871         while e < len(handle.entry):
872             if handle.entry[e][1].inode in self.inodes:
873                 try:
874                     yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
875                 except UnicodeEncodeError:
876                     pass
877             e += 1
878
879     def releasedir(self, fh):
880         del self._filehandles[fh]
881
882     def statfs(self):
883         st = llfuse.StatvfsData()
884         st.f_bsize = 64 * 1024
885         st.f_blocks = 0
886         st.f_files = 0
887
888         st.f_bfree = 0
889         st.f_bavail = 0
890
891         st.f_ffree = 0
892         st.f_favail = 0
893
894         st.f_frsize = 0
895         return st
896
897     # The llfuse documentation recommends only overloading functions that
898     # are actually implemented, as the default implementation will raise ENOSYS.
899     # However, there is a bug in the llfuse default implementation of create()
900     # "create() takes exactly 5 positional arguments (6 given)" which will crash
901     # arv-mount.
902     # The workaround is to implement it with the proper number of parameters,
903     # and then everything works out.
904     def create(self, inode_parent, name, mode, flags, ctx):
905         raise llfuse.FUSEError(errno.EROFS)