3198: Implement cache management for directory objects.
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8
9 from fusefile import StringFile, StreamReaderFile, ObjectFile
10 from fresh import FreshBase, convertTime
11
12 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
13
14 _logger = logging.getLogger('arvados.arvados_fuse')
15
16
17 # Match any character which FUSE or Linux cannot accommodate as part
18 # of a filename. (If present in a collection filename, they will
19 # appear as underscores in the fuse mount.)
20 _disallowed_filename_characters = re.compile('[\x00/]')
21
22 def sanitize_filename(dirty):
23     """Replace disallowed filename characters with harmless "_"."""
24     if dirty is None:
25         return None
26     elif dirty == '':
27         return '_'
28     elif dirty == '.':
29         return '_'
30     elif dirty == '..':
31         return '__'
32     else:
33         return _disallowed_filename_characters.sub('_', dirty)
34
35 def use_counter(orig_func):
36     @functools.wraps(orig_func)
37     def use_counter_wrapper(self, *args, **kwargs):
38         try:
39             self.inc_use()
40             return orig_func(self, *args, **kwargs)
41         finally:
42             self.dec_use()
43     return use_counter_wrapper
44
45
46 class Directory(FreshBase):
47     """Generic directory object, backed by a dict.
48
49     Consists of a set of entries with the key representing the filename
50     and the value referencing a File or Directory object.
51     """
52
53     def __init__(self, parent_inode, inodes):
54         super(Directory, self).__init__()
55
56         """parent_inode is the integer inode number"""
57         self.inode = None
58         if not isinstance(parent_inode, int):
59             raise Exception("parent_inode should be an int")
60         self.parent_inode = parent_inode
61         self.inodes = inodes
62         self._entries = {}
63         self._mtime = time.time()
64         self.use_count = 0
65
66     #  Overriden by subclasses to implement logic to update the entries dict
67     #  when the directory is stale
68     @use_counter
69     def update(self):
70         pass
71
72     # Only used when computing the size of the disk footprint of the directory
73     # (stub)
74     def size(self):
75         return 0
76
77     def in_use(self):
78         return self.use_count > 0
79
80     def inc_use(self):
81         self.use_count += 1
82
83     def dec_use(self):
84         self.use_count -= 1
85
86     def checkupdate(self):
87         if self.stale():
88             try:
89                 self.update()
90             except apiclient.errors.HttpError as e:
91                 _logger.debug(e)
92
93     @use_counter
94     def __getitem__(self, item):
95         self.checkupdate()
96         return self._entries[item]
97
98     @use_counter
99     def items(self):
100         self.checkupdate()
101         return list(self._entries.items())
102
103     @use_counter
104     def __contains__(self, k):
105         self.checkupdate()
106         return k in self._entries
107
108     def fresh(self):
109         self.inodes.touch(self)
110         super(Directory, self).fresh()
111
112     def merge(self, items, fn, same, new_entry):
113         """Helper method for updating the contents of the directory.
114
115         Takes a list describing the new contents of the directory, reuse
116         entries that are the same in both the old and new lists, create new
117         entries, and delete old entries missing from the new list.
118
119         :items: iterable with new directory contents
120
121         :fn: function to take an entry in 'items' and return the desired file or
122         directory name, or None if this entry should be skipped
123
124         :same: function to compare an existing entry (a File or Directory
125         object) with an entry in the items list to determine whether to keep
126         the existing entry.
127
128         :new_entry: function to create a new directory entry (File or Directory
129         object) from an entry in the items list.
130
131         """
132
133         oldentries = self._entries
134         self._entries = {}
135         changed = False
136         for i in items:
137             name = sanitize_filename(fn(i))
138             if name:
139                 if name in oldentries and same(oldentries[name], i):
140                     # move existing directory entry over
141                     self._entries[name] = oldentries[name]
142                     del oldentries[name]
143                 else:
144                     # create new directory entry
145                     ent = new_entry(i)
146                     if ent is not None:
147                         self._entries[name] = self.inodes.add_entry(ent)
148                         changed = True
149
150         # delete any other directory entries that were not in found in 'items'
151         for i in oldentries:
152             llfuse.invalidate_entry(self.inode, str(i))
153             self.inodes.del_entry(oldentries[i])
154             changed = True
155
156         if changed:
157             self._mtime = time.time()
158
159         self.fresh()
160
161     def clear(self, force=False):
162         """Delete all entries"""
163
164         if not self.in_use() or force:
165             oldentries = self._entries
166             self._entries = {}
167             for n in oldentries:
168                 if isinstance(n, Directory):
169                     if not n.clear(force):
170                         self._entries = oldentries
171                         return False
172             for n in oldentries:
173                 if isinstance(n, Directory):
174                     llfuse.invalidate_entry(self.inode, str(n))
175                     self.inodes.del_entry(oldentries[n])
176             llfuse.invalidate_inode(self.inode)
177             self.invalidate()
178             return True
179         else:
180             return False
181
182     def mtime(self):
183         return self._mtime
184
185
186 class CollectionDirectory(Directory):
187     """Represents the root of a directory tree holding a collection."""
188
189     def __init__(self, parent_inode, inodes, api, num_retries, collection):
190         super(CollectionDirectory, self).__init__(parent_inode, inodes)
191         self.api = api
192         self.num_retries = num_retries
193         self.collection_object_file = None
194         self.collection_object = None
195         if isinstance(collection, dict):
196             self.collection_locator = collection['uuid']
197             self._mtime = convertTime(collection.get('modified_at'))
198         else:
199             self.collection_locator = collection
200             self._mtime = 0
201
202     def same(self, i):
203         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
204
205     # Used by arv-web.py to switch the contents of the CollectionDirectory
206     def change_collection(self, new_locator):
207         """Switch the contents of the CollectionDirectory.
208
209         Must be called with llfuse.lock held.
210         """
211
212         self.collection_locator = new_locator
213         self.collection_object = None
214         self.update()
215
216     def new_collection(self, new_collection_object, coll_reader):
217         self.collection_object = new_collection_object
218
219         self._mtime = convertTime(self.collection_object.get('modified_at'))
220
221         if self.collection_object_file is not None:
222             self.collection_object_file.update(self.collection_object)
223
224         self.clear(force=True)
225         for s in coll_reader.all_streams():
226             cwd = self
227             for part in s.name().split('/'):
228                 if part != '' and part != '.':
229                     partname = sanitize_filename(part)
230                     if partname not in cwd._entries:
231                         cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode, self.inodes))
232                         # (hack until using new API)
233                         cwd._entries[partname].inc_use()
234                         # end hack
235                     cwd = cwd._entries[partname]
236             for k, v in s.files().items():
237                 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
238
239     def update(self):
240         try:
241             if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
242                 return True
243
244             if self.collection_locator is None:
245                 self.fresh()
246                 return True
247
248             with llfuse.lock_released:
249                 coll_reader = arvados.CollectionReader(
250                     self.collection_locator, self.api, self.api.keep,
251                     num_retries=self.num_retries)
252                 new_collection_object = coll_reader.api_response() or {}
253                 # If the Collection only exists in Keep, there will be no API
254                 # response.  Fill in the fields we need.
255                 if 'uuid' not in new_collection_object:
256                     new_collection_object['uuid'] = self.collection_locator
257                 if "portable_data_hash" not in new_collection_object:
258                     new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
259                 if 'manifest_text' not in new_collection_object:
260                     new_collection_object['manifest_text'] = coll_reader.manifest_text()
261                 coll_reader.normalize()
262             # end with llfuse.lock_released, re-acquire lock
263
264             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
265                 self.new_collection(new_collection_object, coll_reader)
266
267             self.fresh()
268             return True
269         except arvados.errors.NotFoundError:
270             _logger.exception("arv-mount %s: error", self.collection_locator)
271         except arvados.errors.ArgumentError as detail:
272             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
273             if self.collection_object is not None and "manifest_text" in self.collection_object:
274                 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
275         except Exception:
276             _logger.exception("arv-mount %s: error", self.collection_locator)
277             if self.collection_object is not None and "manifest_text" in self.collection_object:
278                 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
279         return False
280
281     def __getitem__(self, item):
282         self.checkupdate()
283         if item == '.arvados#collection':
284             if self.collection_object_file is None:
285                 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
286                 self.inodes.add_entry(self.collection_object_file)
287             return self.collection_object_file
288         else:
289             return super(CollectionDirectory, self).__getitem__(item)
290
291     def __contains__(self, k):
292         if k == '.arvados#collection':
293             return True
294         else:
295             return super(CollectionDirectory, self).__contains__(k)
296
297     def invalidate(self):
298         super(CollectionDirectory, self).invalidate()
299         self.collection_object = None
300
301     def clear(self, force=False):
302         if self.collection_locator is None:
303             return False
304         else:
305             return super(CollectionDirectory, self).clear(force)
306
307
308 class MagicDirectory(Directory):
309     """A special directory that logically contains the set of all extant keep locators.
310
311     When a file is referenced by lookup(), it is tested to see if it is a valid
312     keep locator to a manifest, and if so, loads the manifest contents as a
313     subdirectory of this directory with the locator as the directory name.
314     Since querying a list of all extant keep locators is impractical, only
315     collections that have already been accessed are visible to readdir().
316
317     """
318
319     README_TEXT = """
320 This directory provides access to Arvados collections as subdirectories listed
321 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
322 the form '1234567890abcdefghijklmnopqrstuv+123').
323
324 Note that this directory will appear empty until you attempt to access a
325 specific collection subdirectory (such as trying to 'cd' into it), at which
326 point the collection will actually be looked up on the server and the directory
327 will appear if it exists.
328 """.lstrip()
329
330     def __init__(self, parent_inode, inodes, api, num_retries):
331         super(MagicDirectory, self).__init__(parent_inode, inodes)
332         self.api = api
333         self.num_retries = num_retries
334
335     def __setattr__(self, name, value):
336         super(MagicDirectory, self).__setattr__(name, value)
337         # When we're assigned an inode, add a README.
338         if ((name == 'inode') and (self.inode is not None) and
339               (not self._entries)):
340             self._entries['README'] = self.inodes.add_entry(
341                 StringFile(self.inode, self.README_TEXT, time.time()))
342             # If we're the root directory, add an identical by_id subdirectory.
343             if self.inode == llfuse.ROOT_INODE:
344                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
345                         self.inode, self.inodes, self.api, self.num_retries))
346
347     def __contains__(self, k):
348         if k in self._entries:
349             return True
350
351         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
352             return False
353
354         try:
355             e = self.inodes.add_entry(CollectionDirectory(
356                     self.inode, self.inodes, self.api, self.num_retries, k))
357
358             if e.update():
359                 self._entries[k] = e
360                 return True
361             else:
362                 return False
363         except Exception as e:
364             _logger.debug('arv-mount exception keep %s', e)
365             return False
366
367     def __getitem__(self, item):
368         if item in self:
369             return self._entries[item]
370         else:
371             raise KeyError("No collection with id " + item)
372
373     def clear(self, force=False):
374         pass
375
376
377 class RecursiveInvalidateDirectory(Directory):
378     def invalidate(self):
379         try:
380             super(RecursiveInvalidateDirectory, self).invalidate()
381             for a in self._entries:
382                 self._entries[a].invalidate()
383         except Exception:
384             _logger.exception()
385
386
387 class TagsDirectory(RecursiveInvalidateDirectory):
388     """A special directory that contains as subdirectories all tags visible to the user."""
389
390     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
391         super(TagsDirectory, self).__init__(parent_inode, inodes)
392         self.api = api
393         self.num_retries = num_retries
394         self._poll = True
395         self._poll_time = poll_time
396
397     def update(self):
398         with llfuse.lock_released:
399             tags = self.api.links().list(
400                 filters=[['link_class', '=', 'tag']],
401                 select=['name'], distinct=True
402                 ).execute(num_retries=self.num_retries)
403         if "items" in tags:
404             self.merge(tags['items'],
405                        lambda i: i['name'],
406                        lambda a, i: a.tag == i['name'],
407                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
408
409
410 class TagDirectory(Directory):
411     """A special directory that contains as subdirectories all collections visible
412     to the user that are tagged with a particular tag.
413     """
414
415     def __init__(self, parent_inode, inodes, api, num_retries, tag,
416                  poll=False, poll_time=60):
417         super(TagDirectory, self).__init__(parent_inode, inodes)
418         self.api = api
419         self.num_retries = num_retries
420         self.tag = tag
421         self._poll = poll
422         self._poll_time = poll_time
423
424     def update(self):
425         with llfuse.lock_released:
426             taggedcollections = self.api.links().list(
427                 filters=[['link_class', '=', 'tag'],
428                          ['name', '=', self.tag],
429                          ['head_uuid', 'is_a', 'arvados#collection']],
430                 select=['head_uuid']
431                 ).execute(num_retries=self.num_retries)
432         self.merge(taggedcollections['items'],
433                    lambda i: i['head_uuid'],
434                    lambda a, i: a.collection_locator == i['head_uuid'],
435                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
436
437
438 class ProjectDirectory(Directory):
439     """A special directory that contains the contents of a project."""
440
441     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
442                  poll=False, poll_time=60):
443         super(ProjectDirectory, self).__init__(parent_inode, inodes)
444         self.api = api
445         self.num_retries = num_retries
446         self.project_object = project_object
447         self.project_object_file = None
448         self.uuid = project_object['uuid']
449         self._poll = poll
450         self._poll_time = poll_time
451
452     def createDirectory(self, i):
453         if collection_uuid_pattern.match(i['uuid']):
454             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
455         elif group_uuid_pattern.match(i['uuid']):
456             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
457         elif link_uuid_pattern.match(i['uuid']):
458             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
459                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
460             else:
461                 return None
462         elif uuid_pattern.match(i['uuid']):
463             return ObjectFile(self.parent_inode, i)
464         else:
465             return None
466
467     def update(self):
468         if self.project_object_file == None:
469             self.project_object_file = ObjectFile(self.inode, self.project_object)
470             self.inodes.add_entry(self.project_object_file)
471
472         def namefn(i):
473             if 'name' in i:
474                 if i['name'] is None or len(i['name']) == 0:
475                     return None
476                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
477                     # collection or subproject
478                     return i['name']
479                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
480                     # name link
481                     return i['name']
482                 elif 'kind' in i and i['kind'].startswith('arvados#'):
483                     # something else
484                     return "{}.{}".format(i['name'], i['kind'][8:])
485             else:
486                 return None
487
488         def samefn(a, i):
489             if isinstance(a, CollectionDirectory):
490                 return a.collection_locator == i['uuid']
491             elif isinstance(a, ProjectDirectory):
492                 return a.uuid == i['uuid']
493             elif isinstance(a, ObjectFile):
494                 return a.uuid == i['uuid'] and not a.stale()
495             return False
496
497         with llfuse.lock_released:
498             if group_uuid_pattern.match(self.uuid):
499                 self.project_object = self.api.groups().get(
500                     uuid=self.uuid).execute(num_retries=self.num_retries)
501             elif user_uuid_pattern.match(self.uuid):
502                 self.project_object = self.api.users().get(
503                     uuid=self.uuid).execute(num_retries=self.num_retries)
504
505             contents = arvados.util.list_all(self.api.groups().contents,
506                                              self.num_retries, uuid=self.uuid)
507
508         # end with llfuse.lock_released, re-acquire lock
509
510         self.merge(contents,
511                    namefn,
512                    samefn,
513                    self.createDirectory)
514
515     def __getitem__(self, item):
516         self.checkupdate()
517         if item == '.arvados#project':
518             return self.project_object_file
519         else:
520             return super(ProjectDirectory, self).__getitem__(item)
521
522     def __contains__(self, k):
523         if k == '.arvados#project':
524             return True
525         else:
526             return super(ProjectDirectory, self).__contains__(k)
527
528
529 class SharedDirectory(Directory):
530     """A special directory that represents users or groups who have shared projects with me."""
531
532     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
533                  poll=False, poll_time=60):
534         super(SharedDirectory, self).__init__(parent_inode, inodes)
535         self.api = api
536         self.num_retries = num_retries
537         self.current_user = api.users().current().execute(num_retries=num_retries)
538         self._poll = True
539         self._poll_time = poll_time
540
541     def update(self):
542         with llfuse.lock_released:
543             all_projects = arvados.util.list_all(
544                 self.api.groups().list, self.num_retries,
545                 filters=[['group_class','=','project']])
546             objects = {}
547             for ob in all_projects:
548                 objects[ob['uuid']] = ob
549
550             roots = []
551             root_owners = {}
552             for ob in all_projects:
553                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
554                     roots.append(ob)
555                     root_owners[ob['owner_uuid']] = True
556
557             lusers = arvados.util.list_all(
558                 self.api.users().list, self.num_retries,
559                 filters=[['uuid','in', list(root_owners)]])
560             lgroups = arvados.util.list_all(
561                 self.api.groups().list, self.num_retries,
562                 filters=[['uuid','in', list(root_owners)]])
563
564             users = {}
565             groups = {}
566
567             for l in lusers:
568                 objects[l["uuid"]] = l
569             for l in lgroups:
570                 objects[l["uuid"]] = l
571
572             contents = {}
573             for r in root_owners:
574                 if r in objects:
575                     obr = objects[r]
576                     if "name" in obr:
577                         contents[obr["name"]] = obr
578                     if "first_name" in obr:
579                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
580
581             for r in roots:
582                 if r['owner_uuid'] not in objects:
583                     contents[r['name']] = r
584
585         # end with llfuse.lock_released, re-acquire lock
586
587         try:
588             self.merge(contents.items(),
589                        lambda i: i[0],
590                        lambda a, i: a.uuid == i[1]['uuid'],
591                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
592         except Exception:
593             _logger.exception()