3198: Add --file-cache and --directory-cache to arv-mount to specify desired
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8
9 from fusefile import StringFile, StreamReaderFile, ObjectFile
10 from fresh import FreshBase, convertTime, use_counter
11
12 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
13
14 _logger = logging.getLogger('arvados.arvados_fuse')
15
16
17 # Match any character which FUSE or Linux cannot accommodate as part
18 # of a filename. (If present in a collection filename, they will
19 # appear as underscores in the fuse mount.)
20 _disallowed_filename_characters = re.compile('[\x00/]')
21
22 def sanitize_filename(dirty):
23     """Replace disallowed filename characters with harmless "_"."""
24     if dirty is None:
25         return None
26     elif dirty == '':
27         return '_'
28     elif dirty == '.':
29         return '_'
30     elif dirty == '..':
31         return '__'
32     else:
33         return _disallowed_filename_characters.sub('_', dirty)
34
35
36 class Directory(FreshBase):
37     """Generic directory object, backed by a dict.
38
39     Consists of a set of entries with the key representing the filename
40     and the value referencing a File or Directory object.
41     """
42
43     def __init__(self, parent_inode, inodes):
44         super(Directory, self).__init__()
45
46         """parent_inode is the integer inode number"""
47         self.inode = None
48         if not isinstance(parent_inode, int):
49             raise Exception("parent_inode should be an int")
50         self.parent_inode = parent_inode
51         self.inodes = inodes
52         self._entries = {}
53         self._mtime = time.time()
54
55     #  Overriden by subclasses to implement logic to update the entries dict
56     #  when the directory is stale
57     @use_counter
58     def update(self):
59         pass
60
61     # Only used when computing the size of the disk footprint of the directory
62     # (stub)
63     def size(self):
64         return 0
65
66     def persisted(self):
67         return False
68
69     def checkupdate(self):
70         if self.stale():
71             try:
72                 self.update()
73             except apiclient.errors.HttpError as e:
74                 _logger.debug(e)
75
76     @use_counter
77     def __getitem__(self, item):
78         self.checkupdate()
79         return self._entries[item]
80
81     @use_counter
82     def items(self):
83         self.checkupdate()
84         return list(self._entries.items())
85
86     @use_counter
87     def __contains__(self, k):
88         self.checkupdate()
89         return k in self._entries
90
91     def fresh(self):
92         self.inodes.touch(self)
93         super(Directory, self).fresh()
94
95     def merge(self, items, fn, same, new_entry):
96         """Helper method for updating the contents of the directory.
97
98         Takes a list describing the new contents of the directory, reuse
99         entries that are the same in both the old and new lists, create new
100         entries, and delete old entries missing from the new list.
101
102         :items: iterable with new directory contents
103
104         :fn: function to take an entry in 'items' and return the desired file or
105         directory name, or None if this entry should be skipped
106
107         :same: function to compare an existing entry (a File or Directory
108         object) with an entry in the items list to determine whether to keep
109         the existing entry.
110
111         :new_entry: function to create a new directory entry (File or Directory
112         object) from an entry in the items list.
113
114         """
115
116         oldentries = self._entries
117         self._entries = {}
118         changed = False
119         for i in items:
120             name = sanitize_filename(fn(i))
121             if name:
122                 if name in oldentries and same(oldentries[name], i):
123                     # move existing directory entry over
124                     self._entries[name] = oldentries[name]
125                     del oldentries[name]
126                 else:
127                     # create new directory entry
128                     ent = new_entry(i)
129                     if ent is not None:
130                         self._entries[name] = self.inodes.add_entry(ent)
131                         changed = True
132
133         # delete any other directory entries that were not in found in 'items'
134         for i in oldentries:
135             llfuse.invalidate_entry(self.inode, str(i))
136             self.inodes.del_entry(oldentries[i])
137             changed = True
138
139         if changed:
140             self._mtime = time.time()
141
142         self.fresh()
143
144     def clear(self, force=False):
145         """Delete all entries"""
146
147         if not self.in_use() or force:
148             oldentries = self._entries
149             self._entries = {}
150             for n in oldentries:
151                 if not oldentries[n].clear(force):
152                     self._entries = oldentries
153                     return False
154             for n in oldentries:
155                 llfuse.invalidate_entry(self.inode, str(n))
156                 self.inodes.del_entry(oldentries[n])
157             llfuse.invalidate_inode(self.inode)
158             self.invalidate()
159             return True
160         else:
161             return False
162
163     def mtime(self):
164         return self._mtime
165
166
167 class CollectionDirectory(Directory):
168     """Represents the root of a directory tree holding a collection."""
169
170     def __init__(self, parent_inode, inodes, api, num_retries, collection):
171         super(CollectionDirectory, self).__init__(parent_inode, inodes)
172         self.api = api
173         self.num_retries = num_retries
174         self.collection_object_file = None
175         self.collection_object = None
176         if isinstance(collection, dict):
177             self.collection_locator = collection['uuid']
178             self._mtime = convertTime(collection.get('modified_at'))
179         else:
180             self.collection_locator = collection
181             self._mtime = 0
182         self._manifest_size = 0
183
184     def same(self, i):
185         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
186
187     # Used by arv-web.py to switch the contents of the CollectionDirectory
188     def change_collection(self, new_locator):
189         """Switch the contents of the CollectionDirectory.
190
191         Must be called with llfuse.lock held.
192         """
193
194         self.collection_locator = new_locator
195         self.collection_object = None
196         self.update()
197
198     def new_collection(self, new_collection_object, coll_reader):
199         self.clear(force=True)
200
201         self.collection_object = new_collection_object
202
203         self._mtime = convertTime(self.collection_object.get('modified_at'))
204
205         if self.collection_object_file is not None:
206             self.collection_object_file.update(self.collection_object)
207
208         for s in coll_reader.all_streams():
209             cwd = self
210             for part in s.name().split('/'):
211                 if part != '' and part != '.':
212                     partname = sanitize_filename(part)
213                     if partname not in cwd._entries:
214                         cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode, self.inodes))
215                     cwd = cwd._entries[partname]
216             for k, v in s.files().items():
217                 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
218
219     def update(self):
220         try:
221             if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
222                 return True
223
224             if self.collection_locator is None:
225                 self.fresh()
226                 return True
227
228             with llfuse.lock_released:
229                 coll_reader = arvados.CollectionReader(
230                     self.collection_locator, self.api, self.api.keep,
231                     num_retries=self.num_retries)
232                 new_collection_object = coll_reader.api_response() or {}
233                 # If the Collection only exists in Keep, there will be no API
234                 # response.  Fill in the fields we need.
235                 if 'uuid' not in new_collection_object:
236                     new_collection_object['uuid'] = self.collection_locator
237                 if "portable_data_hash" not in new_collection_object:
238                     new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
239                 if 'manifest_text' not in new_collection_object:
240                     new_collection_object['manifest_text'] = coll_reader.manifest_text()
241                 coll_reader.normalize()
242             # end with llfuse.lock_released, re-acquire lock
243
244             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
245                 self.new_collection(new_collection_object, coll_reader)
246
247             self._manifest_size = len(coll_reader.manifest_text())
248             _logger.debug("%s manifest_size %i", self, self._manifest_size)
249
250             self.fresh()
251             return True
252         except arvados.errors.NotFoundError:
253             _logger.exception("arv-mount %s: error", self.collection_locator)
254         except arvados.errors.ArgumentError as detail:
255             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
256             if self.collection_object is not None and "manifest_text" in self.collection_object:
257                 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
258         except Exception:
259             _logger.exception("arv-mount %s: error", self.collection_locator)
260             if self.collection_object is not None and "manifest_text" in self.collection_object:
261                 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
262         return False
263
264     def __getitem__(self, item):
265         self.checkupdate()
266         if item == '.arvados#collection':
267             if self.collection_object_file is None:
268                 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
269                 self.inodes.add_entry(self.collection_object_file)
270             return self.collection_object_file
271         else:
272             return super(CollectionDirectory, self).__getitem__(item)
273
274     def __contains__(self, k):
275         if k == '.arvados#collection':
276             return True
277         else:
278             return super(CollectionDirectory, self).__contains__(k)
279
280     def invalidate(self):
281         self.collection_object = None
282         self.collection_object_file = None
283         super(CollectionDirectory, self).invalidate()
284
285     def persisted(self):
286         return (self.collection_locator is not None)
287
288     def objsize(self):
289         return self._manifest_size * 128
290
291 class MagicDirectory(Directory):
292     """A special directory that logically contains the set of all extant keep locators.
293
294     When a file is referenced by lookup(), it is tested to see if it is a valid
295     keep locator to a manifest, and if so, loads the manifest contents as a
296     subdirectory of this directory with the locator as the directory name.
297     Since querying a list of all extant keep locators is impractical, only
298     collections that have already been accessed are visible to readdir().
299
300     """
301
302     README_TEXT = """
303 This directory provides access to Arvados collections as subdirectories listed
304 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
305 the form '1234567890abcdefghijklmnopqrstuv+123').
306
307 Note that this directory will appear empty until you attempt to access a
308 specific collection subdirectory (such as trying to 'cd' into it), at which
309 point the collection will actually be looked up on the server and the directory
310 will appear if it exists.
311 """.lstrip()
312
313     def __init__(self, parent_inode, inodes, api, num_retries):
314         super(MagicDirectory, self).__init__(parent_inode, inodes)
315         self.api = api
316         self.num_retries = num_retries
317
318     def __setattr__(self, name, value):
319         super(MagicDirectory, self).__setattr__(name, value)
320         # When we're assigned an inode, add a README.
321         if ((name == 'inode') and (self.inode is not None) and
322               (not self._entries)):
323             self._entries['README'] = self.inodes.add_entry(
324                 StringFile(self.inode, self.README_TEXT, time.time()))
325             # If we're the root directory, add an identical by_id subdirectory.
326             if self.inode == llfuse.ROOT_INODE:
327                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
328                         self.inode, self.inodes, self.api, self.num_retries))
329
330     def __contains__(self, k):
331         if k in self._entries:
332             return True
333
334         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
335             return False
336
337         try:
338             e = self.inodes.add_entry(CollectionDirectory(
339                     self.inode, self.inodes, self.api, self.num_retries, k))
340
341             if e.update():
342                 self._entries[k] = e
343                 return True
344             else:
345                 return False
346         except Exception as e:
347             _logger.debug('arv-mount exception keep %s', e)
348             return False
349
350     def __getitem__(self, item):
351         if item in self:
352             return self._entries[item]
353         else:
354             raise KeyError("No collection with id " + item)
355
356     def clear(self, force=False):
357         pass
358
359
360 class RecursiveInvalidateDirectory(Directory):
361     def invalidate(self):
362         try:
363             super(RecursiveInvalidateDirectory, self).invalidate()
364             for a in self._entries:
365                 self._entries[a].invalidate()
366         except Exception:
367             _logger.exception()
368
369
370 class TagsDirectory(RecursiveInvalidateDirectory):
371     """A special directory that contains as subdirectories all tags visible to the user."""
372
373     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
374         super(TagsDirectory, self).__init__(parent_inode, inodes)
375         self.api = api
376         self.num_retries = num_retries
377         self._poll = True
378         self._poll_time = poll_time
379
380     def update(self):
381         with llfuse.lock_released:
382             tags = self.api.links().list(
383                 filters=[['link_class', '=', 'tag']],
384                 select=['name'], distinct=True
385                 ).execute(num_retries=self.num_retries)
386         if "items" in tags:
387             self.merge(tags['items'],
388                        lambda i: i['name'],
389                        lambda a, i: a.tag == i['name'],
390                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
391
392
393 class TagDirectory(Directory):
394     """A special directory that contains as subdirectories all collections visible
395     to the user that are tagged with a particular tag.
396     """
397
398     def __init__(self, parent_inode, inodes, api, num_retries, tag,
399                  poll=False, poll_time=60):
400         super(TagDirectory, self).__init__(parent_inode, inodes)
401         self.api = api
402         self.num_retries = num_retries
403         self.tag = tag
404         self._poll = poll
405         self._poll_time = poll_time
406
407     def update(self):
408         with llfuse.lock_released:
409             taggedcollections = self.api.links().list(
410                 filters=[['link_class', '=', 'tag'],
411                          ['name', '=', self.tag],
412                          ['head_uuid', 'is_a', 'arvados#collection']],
413                 select=['head_uuid']
414                 ).execute(num_retries=self.num_retries)
415         self.merge(taggedcollections['items'],
416                    lambda i: i['head_uuid'],
417                    lambda a, i: a.collection_locator == i['head_uuid'],
418                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
419
420
421 class ProjectDirectory(Directory):
422     """A special directory that contains the contents of a project."""
423
424     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
425                  poll=False, poll_time=60):
426         super(ProjectDirectory, self).__init__(parent_inode, inodes)
427         self.api = api
428         self.num_retries = num_retries
429         self.project_object = project_object
430         self.project_object_file = None
431         self.uuid = project_object['uuid']
432         self._poll = poll
433         self._poll_time = poll_time
434
435     def createDirectory(self, i):
436         if collection_uuid_pattern.match(i['uuid']):
437             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
438         elif group_uuid_pattern.match(i['uuid']):
439             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
440         elif link_uuid_pattern.match(i['uuid']):
441             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
442                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
443             else:
444                 return None
445         elif uuid_pattern.match(i['uuid']):
446             return ObjectFile(self.parent_inode, i)
447         else:
448             return None
449
450     def update(self):
451         if self.project_object_file == None:
452             self.project_object_file = ObjectFile(self.inode, self.project_object)
453             self.inodes.add_entry(self.project_object_file)
454
455         def namefn(i):
456             if 'name' in i:
457                 if i['name'] is None or len(i['name']) == 0:
458                     return None
459                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
460                     # collection or subproject
461                     return i['name']
462                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
463                     # name link
464                     return i['name']
465                 elif 'kind' in i and i['kind'].startswith('arvados#'):
466                     # something else
467                     return "{}.{}".format(i['name'], i['kind'][8:])
468             else:
469                 return None
470
471         def samefn(a, i):
472             if isinstance(a, CollectionDirectory):
473                 return a.collection_locator == i['uuid']
474             elif isinstance(a, ProjectDirectory):
475                 return a.uuid == i['uuid']
476             elif isinstance(a, ObjectFile):
477                 return a.uuid == i['uuid'] and not a.stale()
478             return False
479
480         with llfuse.lock_released:
481             if group_uuid_pattern.match(self.uuid):
482                 self.project_object = self.api.groups().get(
483                     uuid=self.uuid).execute(num_retries=self.num_retries)
484             elif user_uuid_pattern.match(self.uuid):
485                 self.project_object = self.api.users().get(
486                     uuid=self.uuid).execute(num_retries=self.num_retries)
487
488             contents = arvados.util.list_all(self.api.groups().contents,
489                                              self.num_retries, uuid=self.uuid)
490
491         # end with llfuse.lock_released, re-acquire lock
492
493         self.merge(contents,
494                    namefn,
495                    samefn,
496                    self.createDirectory)
497
498     def __getitem__(self, item):
499         self.checkupdate()
500         if item == '.arvados#project':
501             return self.project_object_file
502         else:
503             return super(ProjectDirectory, self).__getitem__(item)
504
505     def __contains__(self, k):
506         if k == '.arvados#project':
507             return True
508         else:
509             return super(ProjectDirectory, self).__contains__(k)
510
511     def persisted(self):
512         return False
513
514
515 class SharedDirectory(Directory):
516     """A special directory that represents users or groups who have shared projects with me."""
517
518     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
519                  poll=False, poll_time=60):
520         super(SharedDirectory, self).__init__(parent_inode, inodes)
521         self.api = api
522         self.num_retries = num_retries
523         self.current_user = api.users().current().execute(num_retries=num_retries)
524         self._poll = True
525         self._poll_time = poll_time
526
527     def update(self):
528         with llfuse.lock_released:
529             all_projects = arvados.util.list_all(
530                 self.api.groups().list, self.num_retries,
531                 filters=[['group_class','=','project']])
532             objects = {}
533             for ob in all_projects:
534                 objects[ob['uuid']] = ob
535
536             roots = []
537             root_owners = {}
538             for ob in all_projects:
539                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
540                     roots.append(ob)
541                     root_owners[ob['owner_uuid']] = True
542
543             lusers = arvados.util.list_all(
544                 self.api.users().list, self.num_retries,
545                 filters=[['uuid','in', list(root_owners)]])
546             lgroups = arvados.util.list_all(
547                 self.api.groups().list, self.num_retries,
548                 filters=[['uuid','in', list(root_owners)]])
549
550             users = {}
551             groups = {}
552
553             for l in lusers:
554                 objects[l["uuid"]] = l
555             for l in lgroups:
556                 objects[l["uuid"]] = l
557
558             contents = {}
559             for r in root_owners:
560                 if r in objects:
561                     obr = objects[r]
562                     if "name" in obr:
563                         contents[obr["name"]] = obr
564                     if "first_name" in obr:
565                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
566
567             for r in roots:
568                 if r['owner_uuid'] not in objects:
569                     contents[r['name']] = r
570
571         # end with llfuse.lock_released, re-acquire lock
572
573         try:
574             self.merge(contents.items(),
575                        lambda i: i[0],
576                        lambda a, i: a.uuid == i[1]['uuid'],
577                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
578         except Exception:
579             _logger.exception()