Merge branch '3198-inode-cache' into 3198-writable-fuse, fix tests.
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8
9 from fusefile import StringFile, ObjectFile, FuseArvadosFile
10 from fresh import FreshBase, convertTime, use_counter
11
12 import arvados.collection
13 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
14
15 _logger = logging.getLogger('arvados.arvados_fuse')
16
17
18 # Match any character which FUSE or Linux cannot accommodate as part
19 # of a filename. (If present in a collection filename, they will
20 # appear as underscores in the fuse mount.)
21 _disallowed_filename_characters = re.compile('[\x00/]')
22
23 def sanitize_filename(dirty):
24     """Replace disallowed filename characters with harmless "_"."""
25     if dirty is None:
26         return None
27     elif dirty == '':
28         return '_'
29     elif dirty == '.':
30         return '_'
31     elif dirty == '..':
32         return '__'
33     else:
34         return _disallowed_filename_characters.sub('_', dirty)
35
36
37 class Directory(FreshBase):
38     """Generic directory object, backed by a dict.
39
40     Consists of a set of entries with the key representing the filename
41     and the value referencing a File or Directory object.
42     """
43
44     def __init__(self, parent_inode, inodes):
45         super(Directory, self).__init__()
46
47         """parent_inode is the integer inode number"""
48         self.inode = None
49         if not isinstance(parent_inode, int):
50             raise Exception("parent_inode should be an int")
51         self.parent_inode = parent_inode
52         self.inodes = inodes
53         self._entries = {}
54         self._mtime = time.time()
55
56     #  Overriden by subclasses to implement logic to update the entries dict
57     #  when the directory is stale
58     @use_counter
59     def update(self):
60         pass
61
62     # Only used when computing the size of the disk footprint of the directory
63     # (stub)
64     def size(self):
65         return 0
66
67     def persisted(self):
68         return False
69
70     def checkupdate(self):
71         if self.stale():
72             try:
73                 self.update()
74             except apiclient.errors.HttpError as e:
75                 _logger.debug(e)
76
77     @use_counter
78     def __getitem__(self, item):
79         self.checkupdate()
80         return self._entries[item]
81
82     @use_counter
83     def items(self):
84         self.checkupdate()
85         return list(self._entries.items())
86
87     @use_counter
88     def __contains__(self, k):
89         self.checkupdate()
90         return k in self._entries
91
92     def fresh(self):
93         self.inodes.touch(self)
94         super(Directory, self).fresh()
95
96     def merge(self, items, fn, same, new_entry):
97         """Helper method for updating the contents of the directory.
98
99         Takes a list describing the new contents of the directory, reuse
100         entries that are the same in both the old and new lists, create new
101         entries, and delete old entries missing from the new list.
102
103         :items: iterable with new directory contents
104
105         :fn: function to take an entry in 'items' and return the desired file or
106         directory name, or None if this entry should be skipped
107
108         :same: function to compare an existing entry (a File or Directory
109         object) with an entry in the items list to determine whether to keep
110         the existing entry.
111
112         :new_entry: function to create a new directory entry (File or Directory
113         object) from an entry in the items list.
114
115         """
116
117         oldentries = self._entries
118         self._entries = {}
119         changed = False
120         for i in items:
121             name = sanitize_filename(fn(i))
122             if name:
123                 if name in oldentries and same(oldentries[name], i):
124                     # move existing directory entry over
125                     self._entries[name] = oldentries[name]
126                     del oldentries[name]
127                 else:
128                     # create new directory entry
129                     ent = new_entry(i)
130                     if ent is not None:
131                         self._entries[name] = self.inodes.add_entry(ent)
132                         changed = True
133
134         # delete any other directory entries that were not in found in 'items'
135         for i in oldentries:
136             llfuse.invalidate_entry(self.inode, str(i))
137             self.inodes.del_entry(oldentries[i])
138             changed = True
139
140         if changed:
141             llfuse.invalidate_inode(self.inode)
142             self._mtime = time.time()
143
144         self.fresh()
145
146     def clear(self, force=False):
147         """Delete all entries"""
148
149         if not self.in_use() or force:
150             oldentries = self._entries
151             self._entries = {}
152             for n in oldentries:
153                 if not oldentries[n].clear(force):
154                     self._entries = oldentries
155                     return False
156             for n in oldentries:
157                 llfuse.invalidate_entry(self.inode, str(n))
158                 self.inodes.del_entry(oldentries[n])
159             llfuse.invalidate_inode(self.inode)
160             self.invalidate()
161             return True
162         else:
163             return False
164
165     def mtime(self):
166         return self._mtime
167
168     def writable(self):
169         return False
170
171     def flush(self):
172         pass
173
174 class CollectionDirectoryBase(Directory):
175     def __init__(self, parent_inode, inodes, collection):
176         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
177         self.collection = collection
178
179     def new_entry(self, name, item, mtime):
180         name = sanitize_filename(name)
181         if isinstance(item, arvados.collection.RichCollectionBase):
182             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
183             self._entries[name].populate(mtime)
184         else:
185             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
186
187     def on_event(self, event, collection, name, item):
188         _logger.warn("Got event! %s %s %s %s", event, collection, name, item)
189         if collection == self.collection:
190             with llfuse.lock:
191                 if event == arvados.collection.ADD:
192                     self.new_entry(name, item, self.mtime())
193                 elif event == arvados.collection.DEL:
194                     ent = self._entries[name]
195                     del self._entries[name]
196                     llfuse.invalidate_entry(self.inode, name)
197                     self.inodes.del_entry(ent)
198                 elif event == arvados.collection.MOD:
199                     ent = self._entries[name]
200                     llfuse.invalidate_inode(ent.inode)
201         _logger.warn("Finished handling event")
202
203     def populate(self, mtime):
204         self._mtime = mtime
205         self.collection.subscribe(self.on_event)
206         for entry, item in self.collection.items():
207             self.new_entry(entry, item, self.mtime())
208
209     def writable(self):
210         return self.collection.writable()
211
212     def flush(self):
213         self.collection.root_collection().save()
214
215 class CollectionDirectory(CollectionDirectoryBase):
216     """Represents the root of a directory tree holding a collection."""
217
218     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
219         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
220         self.api = api
221         self.num_retries = num_retries
222         self.collection_record_file = None
223         self.collection_record = None
224         if isinstance(collection_record, dict):
225             self.collection_locator = collection_record['uuid']
226             self._mtime = convertTime(collection_record.get('modified_at'))
227         else:
228             self.collection_locator = collection_record
229             self._mtime = 0
230         self._manifest_size = 0
231         if self.collection_locator:
232             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
233
234     def same(self, i):
235         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
236
237     def writable(self):
238         return self.collection.writable() if self.collection is not None else self._writable
239
240     # Used by arv-web.py to switch the contents of the CollectionDirectory
241     def change_collection(self, new_locator):
242         """Switch the contents of the CollectionDirectory.
243
244         Must be called with llfuse.lock held.
245         """
246
247         self.collection_locator = new_locator
248         self.collection_record = None
249         self.update()
250
251     def new_collection(self, new_collection_record, coll_reader):
252         if self.inode:
253             self.clear(force=True)
254
255         self.collection_record = new_collection_record
256
257         if self.collection_record:
258             self._mtime = convertTime(self.collection_record.get('modified_at'))
259             self.collection_locator = self.collection_record["uuid"]
260             if self.collection_record_file is not None:
261                 self.collection_record_file.update(self.collection_record)
262
263         self.collection = coll_reader
264         self.populate(self.mtime())
265
266     def uuid(self):
267         return self.collection_locator
268
269     def update(self):
270         try:
271             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
272                 return True
273
274             if self.collection_locator is None:
275                 self.fresh()
276                 return True
277
278             with llfuse.lock_released:
279                 _logger.debug("Updating %s", self.collection_locator)
280                 if self.collection:
281                     self.collection.update()
282                 else:
283                     if uuid_pattern.match(self.collection_locator):
284                         coll_reader = arvados.collection.Collection(
285                             self.collection_locator, self.api, self.api.keep,
286                             num_retries=self.num_retries)
287                     else:
288                         coll_reader = arvados.collection.CollectionReader(
289                             self.collection_locator, self.api, self.api.keep,
290                             num_retries=self.num_retries)
291                     new_collection_record = coll_reader.api_response() or {}
292                     # If the Collection only exists in Keep, there will be no API
293                     # response.  Fill in the fields we need.
294                     if 'uuid' not in new_collection_record:
295                         new_collection_record['uuid'] = self.collection_locator
296                     if "portable_data_hash" not in new_collection_record:
297                         new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
298                     if 'manifest_text' not in new_collection_record:
299                         new_collection_record['manifest_text'] = coll_reader.manifest_text()
300
301                     if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
302                         self.new_collection(new_collection_record, coll_reader)
303
304                     self._manifest_size = len(coll_reader.manifest_text())
305                     _logger.debug("%s manifest_size %i", self, self._manifest_size)
306             # end with llfuse.lock_released, re-acquire lock
307
308             self.fresh()
309             return True
310         except arvados.errors.NotFoundError:
311             _logger.exception("arv-mount %s: error", self.collection_locator)
312         except arvados.errors.ArgumentError as detail:
313             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
314             if self.collection_record is not None and "manifest_text" in self.collection_record:
315                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
316         except Exception:
317             _logger.exception("arv-mount %s: error", self.collection_locator)
318             if self.collection_record is not None and "manifest_text" in self.collection_record:
319                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
320         return False
321
322     def __getitem__(self, item):
323         self.checkupdate()
324         if item == '.arvados#collection':
325             if self.collection_record_file is None:
326                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
327                 self.inodes.add_entry(self.collection_record_file)
328             return self.collection_record_file
329         else:
330             return super(CollectionDirectory, self).__getitem__(item)
331
332     def __contains__(self, k):
333         if k == '.arvados#collection':
334             return True
335         else:
336             return super(CollectionDirectory, self).__contains__(k)
337
338     def invalidate(self):
339         self.collection_record = None
340         self.collection_record_file = None
341         super(CollectionDirectory, self).invalidate()
342
343     def persisted(self):
344         return (self.collection_locator is not None)
345
346     def objsize(self):
347         return self._manifest_size * 128
348
349 class MagicDirectory(Directory):
350     """A special directory that logically contains the set of all extant keep locators.
351
352     When a file is referenced by lookup(), it is tested to see if it is a valid
353     keep locator to a manifest, and if so, loads the manifest contents as a
354     subdirectory of this directory with the locator as the directory name.
355     Since querying a list of all extant keep locators is impractical, only
356     collections that have already been accessed are visible to readdir().
357
358     """
359
360     README_TEXT = """
361 This directory provides access to Arvados collections as subdirectories listed
362 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
363 the form '1234567890abcdefghijklmnopqrstuv+123').
364
365 Note that this directory will appear empty until you attempt to access a
366 specific collection subdirectory (such as trying to 'cd' into it), at which
367 point the collection will actually be looked up on the server and the directory
368 will appear if it exists.
369 """.lstrip()
370
371     def __init__(self, parent_inode, inodes, api, num_retries):
372         super(MagicDirectory, self).__init__(parent_inode, inodes)
373         self.api = api
374         self.num_retries = num_retries
375
376     def __setattr__(self, name, value):
377         super(MagicDirectory, self).__setattr__(name, value)
378         # When we're assigned an inode, add a README.
379         if ((name == 'inode') and (self.inode is not None) and
380               (not self._entries)):
381             self._entries['README'] = self.inodes.add_entry(
382                 StringFile(self.inode, self.README_TEXT, time.time()))
383             # If we're the root directory, add an identical by_id subdirectory.
384             if self.inode == llfuse.ROOT_INODE:
385                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
386                         self.inode, self.inodes, self.api, self.num_retries))
387
388     def __contains__(self, k):
389         if k in self._entries:
390             return True
391
392         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
393             return False
394
395         try:
396             e = self.inodes.add_entry(CollectionDirectory(
397                     self.inode, self.inodes, self.api, self.num_retries, k))
398
399             if e.update():
400                 self._entries[k] = e
401                 return True
402             else:
403                 return False
404         except Exception as e:
405             _logger.debug('arv-mount exception keep %s', e)
406             return False
407
408     def __getitem__(self, item):
409         if item in self:
410             return self._entries[item]
411         else:
412             raise KeyError("No collection with id " + item)
413
414     def clear(self, force=False):
415         pass
416
417
418 class RecursiveInvalidateDirectory(Directory):
419     def invalidate(self):
420         try:
421             super(RecursiveInvalidateDirectory, self).invalidate()
422             for a in self._entries:
423                 self._entries[a].invalidate()
424         except Exception:
425             _logger.exception()
426
427
428 class TagsDirectory(RecursiveInvalidateDirectory):
429     """A special directory that contains as subdirectories all tags visible to the user."""
430
431     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
432         super(TagsDirectory, self).__init__(parent_inode, inodes)
433         self.api = api
434         self.num_retries = num_retries
435         self._poll = True
436         self._poll_time = poll_time
437
438     def update(self):
439         with llfuse.lock_released:
440             tags = self.api.links().list(
441                 filters=[['link_class', '=', 'tag']],
442                 select=['name'], distinct=True
443                 ).execute(num_retries=self.num_retries)
444         if "items" in tags:
445             self.merge(tags['items'],
446                        lambda i: i['name'],
447                        lambda a, i: a.tag == i['name'],
448                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
449
450
451 class TagDirectory(Directory):
452     """A special directory that contains as subdirectories all collections visible
453     to the user that are tagged with a particular tag.
454     """
455
456     def __init__(self, parent_inode, inodes, api, num_retries, tag,
457                  poll=False, poll_time=60):
458         super(TagDirectory, self).__init__(parent_inode, inodes)
459         self.api = api
460         self.num_retries = num_retries
461         self.tag = tag
462         self._poll = poll
463         self._poll_time = poll_time
464
465     def update(self):
466         with llfuse.lock_released:
467             taggedcollections = self.api.links().list(
468                 filters=[['link_class', '=', 'tag'],
469                          ['name', '=', self.tag],
470                          ['head_uuid', 'is_a', 'arvados#collection']],
471                 select=['head_uuid']
472                 ).execute(num_retries=self.num_retries)
473         self.merge(taggedcollections['items'],
474                    lambda i: i['head_uuid'],
475                    lambda a, i: a.collection_locator == i['head_uuid'],
476                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
477
478
479 class ProjectDirectory(Directory):
480     """A special directory that contains the contents of a project."""
481
482     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
483                  poll=False, poll_time=60):
484         super(ProjectDirectory, self).__init__(parent_inode, inodes)
485         self.api = api
486         self.num_retries = num_retries
487         self.project_object = project_object
488         self.project_object_file = None
489         self.uuid = project_object['uuid']
490         self._poll = poll
491         self._poll_time = poll_time
492
493     def createDirectory(self, i):
494         if collection_uuid_pattern.match(i['uuid']):
495             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
496         elif group_uuid_pattern.match(i['uuid']):
497             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
498         elif link_uuid_pattern.match(i['uuid']):
499             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
500                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
501             else:
502                 return None
503         elif uuid_pattern.match(i['uuid']):
504             return ObjectFile(self.parent_inode, i)
505         else:
506             return None
507
508     def uuid(self):
509         return self.uuid
510
511     def update(self):
512         if self.project_object_file == None:
513             self.project_object_file = ObjectFile(self.inode, self.project_object)
514             self.inodes.add_entry(self.project_object_file)
515
516         def namefn(i):
517             if 'name' in i:
518                 if i['name'] is None or len(i['name']) == 0:
519                     return None
520                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
521                     # collection or subproject
522                     return i['name']
523                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
524                     # name link
525                     return i['name']
526                 elif 'kind' in i and i['kind'].startswith('arvados#'):
527                     # something else
528                     return "{}.{}".format(i['name'], i['kind'][8:])
529             else:
530                 return None
531
532         def samefn(a, i):
533             if isinstance(a, CollectionDirectory):
534                 return a.collection_locator == i['uuid']
535             elif isinstance(a, ProjectDirectory):
536                 return a.uuid == i['uuid']
537             elif isinstance(a, ObjectFile):
538                 return a.uuid == i['uuid'] and not a.stale()
539             return False
540
541         with llfuse.lock_released:
542             if group_uuid_pattern.match(self.uuid):
543                 self.project_object = self.api.groups().get(
544                     uuid=self.uuid).execute(num_retries=self.num_retries)
545             elif user_uuid_pattern.match(self.uuid):
546                 self.project_object = self.api.users().get(
547                     uuid=self.uuid).execute(num_retries=self.num_retries)
548
549             contents = arvados.util.list_all(self.api.groups().contents,
550                                              self.num_retries, uuid=self.uuid)
551
552         # end with llfuse.lock_released, re-acquire lock
553
554         self.merge(contents,
555                    namefn,
556                    samefn,
557                    self.createDirectory)
558
559     def __getitem__(self, item):
560         self.checkupdate()
561         if item == '.arvados#project':
562             return self.project_object_file
563         else:
564             return super(ProjectDirectory, self).__getitem__(item)
565
566     def __contains__(self, k):
567         if k == '.arvados#project':
568             return True
569         else:
570             return super(ProjectDirectory, self).__contains__(k)
571
572     def persisted(self):
573         return False
574
575
576 class SharedDirectory(Directory):
577     """A special directory that represents users or groups who have shared projects with me."""
578
579     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
580                  poll=False, poll_time=60):
581         super(SharedDirectory, self).__init__(parent_inode, inodes)
582         self.api = api
583         self.num_retries = num_retries
584         self.current_user = api.users().current().execute(num_retries=num_retries)
585         self._poll = True
586         self._poll_time = poll_time
587
588     def update(self):
589         with llfuse.lock_released:
590             all_projects = arvados.util.list_all(
591                 self.api.groups().list, self.num_retries,
592                 filters=[['group_class','=','project']])
593             objects = {}
594             for ob in all_projects:
595                 objects[ob['uuid']] = ob
596
597             roots = []
598             root_owners = {}
599             for ob in all_projects:
600                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
601                     roots.append(ob)
602                     root_owners[ob['owner_uuid']] = True
603
604             lusers = arvados.util.list_all(
605                 self.api.users().list, self.num_retries,
606                 filters=[['uuid','in', list(root_owners)]])
607             lgroups = arvados.util.list_all(
608                 self.api.groups().list, self.num_retries,
609                 filters=[['uuid','in', list(root_owners)]])
610
611             users = {}
612             groups = {}
613
614             for l in lusers:
615                 objects[l["uuid"]] = l
616             for l in lgroups:
617                 objects[l["uuid"]] = l
618
619             contents = {}
620             for r in root_owners:
621                 if r in objects:
622                     obr = objects[r]
623                     if "name" in obr:
624                         contents[obr["name"]] = obr
625                     if "first_name" in obr:
626                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
627
628             for r in roots:
629                 if r['owner_uuid'] not in objects:
630                     contents[r['name']] = r
631
632         # end with llfuse.lock_released, re-acquire lock
633
634         try:
635             self.merge(contents.items(),
636                        lambda i: i[0],
637                        lambda a, i: a.uuid == i[1]['uuid'],
638                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
639         except Exception:
640             _logger.exception()