3198: Serialize updates to avoid doing redundant API server calls.
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8 import threading
9
10 from fusefile import StringFile, ObjectFile, FuseArvadosFile
11 from fresh import FreshBase, convertTime, use_counter
12
13 import arvados.collection
14 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
15
16 _logger = logging.getLogger('arvados.arvados_fuse')
17
18
19 # Match any character which FUSE or Linux cannot accommodate as part
20 # of a filename. (If present in a collection filename, they will
21 # appear as underscores in the fuse mount.)
22 _disallowed_filename_characters = re.compile('[\x00/]')
23
24 def sanitize_filename(dirty):
25     """Replace disallowed filename characters with harmless "_"."""
26     if dirty is None:
27         return None
28     elif dirty == '':
29         return '_'
30     elif dirty == '.':
31         return '_'
32     elif dirty == '..':
33         return '__'
34     else:
35         return _disallowed_filename_characters.sub('_', dirty)
36
37
38 class Directory(FreshBase):
39     """Generic directory object, backed by a dict.
40
41     Consists of a set of entries with the key representing the filename
42     and the value referencing a File or Directory object.
43     """
44
45     def __init__(self, parent_inode, inodes):
46         super(Directory, self).__init__()
47
48         """parent_inode is the integer inode number"""
49         self.inode = None
50         if not isinstance(parent_inode, int):
51             raise Exception("parent_inode should be an int")
52         self.parent_inode = parent_inode
53         self.inodes = inodes
54         self._entries = {}
55         self._mtime = time.time()
56
57     #  Overriden by subclasses to implement logic to update the entries dict
58     #  when the directory is stale
59     @use_counter
60     def update(self):
61         pass
62
63     # Only used when computing the size of the disk footprint of the directory
64     # (stub)
65     def size(self):
66         return 0
67
68     def persisted(self):
69         return False
70
71     def checkupdate(self):
72         if self.stale():
73             try:
74                 self.update()
75             except apiclient.errors.HttpError as e:
76                 _logger.warn(e)
77
78     @use_counter
79     def __getitem__(self, item):
80         self.checkupdate()
81         return self._entries[item]
82
83     @use_counter
84     def items(self):
85         self.checkupdate()
86         return list(self._entries.items())
87
88     @use_counter
89     def __contains__(self, k):
90         self.checkupdate()
91         return k in self._entries
92
93     def fresh(self):
94         self.inodes.touch(self)
95         super(Directory, self).fresh()
96
97     def merge(self, items, fn, same, new_entry):
98         """Helper method for updating the contents of the directory.
99
100         Takes a list describing the new contents of the directory, reuse
101         entries that are the same in both the old and new lists, create new
102         entries, and delete old entries missing from the new list.
103
104         :items: iterable with new directory contents
105
106         :fn: function to take an entry in 'items' and return the desired file or
107         directory name, or None if this entry should be skipped
108
109         :same: function to compare an existing entry (a File or Directory
110         object) with an entry in the items list to determine whether to keep
111         the existing entry.
112
113         :new_entry: function to create a new directory entry (File or Directory
114         object) from an entry in the items list.
115
116         """
117
118         oldentries = self._entries
119         self._entries = {}
120         changed = False
121         for i in items:
122             name = sanitize_filename(fn(i))
123             if name:
124                 if name in oldentries and same(oldentries[name], i):
125                     # move existing directory entry over
126                     self._entries[name] = oldentries[name]
127                     del oldentries[name]
128                 else:
129                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
130                     # create new directory entry
131                     ent = new_entry(i)
132                     if ent is not None:
133                         self._entries[name] = self.inodes.add_entry(ent)
134                         changed = True
135
136         # delete any other directory entries that were not in found in 'items'
137         for i in oldentries:
138             _logger.debug("Forgetting about entry '%s' on inode %i", str(i), self.inode)
139             llfuse.invalidate_entry(self.inode, str(i))
140             self.inodes.del_entry(oldentries[i])
141             changed = True
142
143         if changed:
144             llfuse.invalidate_inode(self.inode)
145             self._mtime = time.time()
146
147         self.fresh()
148
149     def clear(self, force=False):
150         """Delete all entries"""
151
152         if not self.in_use() or force:
153             oldentries = self._entries
154             self._entries = {}
155             for n in oldentries:
156                 if not oldentries[n].clear(force):
157                     self._entries = oldentries
158                     return False
159             for n in oldentries:
160                 llfuse.invalidate_entry(self.inode, str(n))
161                 self.inodes.del_entry(oldentries[n])
162             llfuse.invalidate_inode(self.inode)
163             self.invalidate()
164             return True
165         else:
166             return False
167
168     def mtime(self):
169         return self._mtime
170
171     def writable(self):
172         return False
173
174     def flush(self):
175         pass
176
177     def create(self):
178         raise NotImplementedError()
179
180     def mkdir(self):
181         raise NotImplementedError()
182
183     def unlink(self):
184         raise NotImplementedError()
185
186     def rmdir(self):
187         raise NotImplementedError()
188
189     def rename(self):
190         raise NotImplementedError()
191
192 class CollectionDirectoryBase(Directory):
193     def __init__(self, parent_inode, inodes, collection):
194         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
195         self.collection = collection
196
197     def new_entry(self, name, item, mtime):
198         name = sanitize_filename(name)
199         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
200             if item.fuse_entry.dead is not True:
201                 raise Exception("Can only reparent dead inode entry")
202             if item.fuse_entry.inode is None:
203                 raise Exception("Reparented entry must still have valid inode")
204             item.fuse_entry.dead = False
205             self._entries[name] = item.fuse_entry
206         elif isinstance(item, arvados.collection.RichCollectionBase):
207             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
208             self._entries[name].populate(mtime)
209         else:
210             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
211         item.fuse_entry = self._entries[name]
212
213     def on_event(self, event, collection, name, item):
214         if collection == self.collection:
215             _logger.debug("%s %s %s %s", event, collection, name, item)
216             with llfuse.lock:
217                 if event == arvados.collection.ADD:
218                     self.new_entry(name, item, self.mtime())
219                 elif event == arvados.collection.DEL:
220                     ent = self._entries[name]
221                     del self._entries[name]
222                     llfuse.invalidate_entry(self.inode, name)
223                     self.inodes.del_entry(ent)
224                 elif event == arvados.collection.MOD:
225                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
226                         llfuse.invalidate_inode(item.fuse_entry.inode)
227                     elif name in self._entries:
228                         llfuse.invalidate_inode(self._entries[name].inode)
229
230     def populate(self, mtime):
231         self._mtime = mtime
232         self.collection.subscribe(self.on_event)
233         for entry, item in self.collection.items():
234             self.new_entry(entry, item, self.mtime())
235
236     def writable(self):
237         return self.collection.writable()
238
239     def flush(self):
240         self.collection.root_collection().save()
241
242     def create(self, name):
243         self.collection.open(name, "w").close()
244
245     def mkdir(self, name):
246         self.collection.mkdirs(name)
247
248     def unlink(self, name):
249         self.collection.remove(name)
250
251     def rmdir(self, name):
252         self.collection.remove(name)
253
254     def rename(self, name_old, name_new, src):
255         if not isinstance(src, CollectionDirectoryBase):
256             raise llfuse.FUSEError(errno.EPERM)
257         self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
258         self.flush()
259         src.flush()
260
261
262 class CollectionDirectory(CollectionDirectoryBase):
263     """Represents the root of a directory tree holding a collection."""
264
265     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
266         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
267         self.api = api
268         self.num_retries = num_retries
269         self.collection_record_file = None
270         self.collection_record = None
271         if isinstance(collection_record, dict):
272             self.collection_locator = collection_record['uuid']
273             self._mtime = convertTime(collection_record.get('modified_at'))
274         else:
275             self.collection_locator = collection_record
276             self._mtime = 0
277         self._manifest_size = 0
278         if self.collection_locator:
279             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
280         self._updating_lock = threading.Lock()
281
282     def same(self, i):
283         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
284
285     def writable(self):
286         return self.collection.writable() if self.collection is not None else self._writable
287
288     # Used by arv-web.py to switch the contents of the CollectionDirectory
289     def change_collection(self, new_locator):
290         """Switch the contents of the CollectionDirectory.
291
292         Must be called with llfuse.lock held.
293         """
294
295         self.collection_locator = new_locator
296         self.collection_record = None
297         self.update()
298
299     def new_collection(self, new_collection_record, coll_reader):
300         if self.inode:
301             self.clear(force=True)
302
303         self.collection_record = new_collection_record
304
305         if self.collection_record:
306             self._mtime = convertTime(self.collection_record.get('modified_at'))
307             self.collection_locator = self.collection_record["uuid"]
308             if self.collection_record_file is not None:
309                 self.collection_record_file.update(self.collection_record)
310
311         self.collection = coll_reader
312         self.populate(self.mtime())
313
314     def uuid(self):
315         return self.collection_locator
316
317     def update(self):
318         try:
319             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
320                 return True
321
322             if self.collection_locator is None:
323                 self.fresh()
324                 return True
325
326             try:
327                 with llfuse.lock_released:
328                     self._updating_lock.acquire()
329                     if not self.stale():
330                         return
331
332                     _logger.debug("Updating %s", self.collection_locator)
333                     if self.collection:
334                         self.collection.update()
335                     else:
336                         if uuid_pattern.match(self.collection_locator):
337                             coll_reader = arvados.collection.Collection(
338                                 self.collection_locator, self.api, self.api.keep,
339                                 num_retries=self.num_retries)
340                         else:
341                             coll_reader = arvados.collection.CollectionReader(
342                                 self.collection_locator, self.api, self.api.keep,
343                                 num_retries=self.num_retries)
344                         new_collection_record = coll_reader.api_response() or {}
345                         # If the Collection only exists in Keep, there will be no API
346                         # response.  Fill in the fields we need.
347                         if 'uuid' not in new_collection_record:
348                             new_collection_record['uuid'] = self.collection_locator
349                         if "portable_data_hash" not in new_collection_record:
350                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
351                         if 'manifest_text' not in new_collection_record:
352                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
353
354                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
355                             self.new_collection(new_collection_record, coll_reader)
356
357                         self._manifest_size = len(coll_reader.manifest_text())
358                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
359                 # end with llfuse.lock_released, re-acquire lock
360
361                 self.fresh()
362                 return True
363             finally:
364                 self._updating_lock.release()
365         except arvados.errors.NotFoundError:
366             _logger.exception("arv-mount %s: error", self.collection_locator)
367         except arvados.errors.ArgumentError as detail:
368             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
369             if self.collection_record is not None and "manifest_text" in self.collection_record:
370                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
371         except Exception:
372             _logger.exception("arv-mount %s: error", self.collection_locator)
373             if self.collection_record is not None and "manifest_text" in self.collection_record:
374                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
375         return False
376
377     def __getitem__(self, item):
378         self.checkupdate()
379         if item == '.arvados#collection':
380             if self.collection_record_file is None:
381                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
382                 self.inodes.add_entry(self.collection_record_file)
383             return self.collection_record_file
384         else:
385             return super(CollectionDirectory, self).__getitem__(item)
386
387     def __contains__(self, k):
388         if k == '.arvados#collection':
389             return True
390         else:
391             return super(CollectionDirectory, self).__contains__(k)
392
393     def invalidate(self):
394         self.collection_record = None
395         self.collection_record_file = None
396         super(CollectionDirectory, self).invalidate()
397
398     def persisted(self):
399         return (self.collection_locator is not None)
400
401     def objsize(self):
402         # This is an empirically-derived heuristic to estimate the memory used
403         # to store this collection's metadata.  Calculating the memory
404         # footprint directly would be more accurate, but also more complicated.
405         return self._manifest_size * 128
406
407 class MagicDirectory(Directory):
408     """A special directory that logically contains the set of all extant keep locators.
409
410     When a file is referenced by lookup(), it is tested to see if it is a valid
411     keep locator to a manifest, and if so, loads the manifest contents as a
412     subdirectory of this directory with the locator as the directory name.
413     Since querying a list of all extant keep locators is impractical, only
414     collections that have already been accessed are visible to readdir().
415
416     """
417
418     README_TEXT = """
419 This directory provides access to Arvados collections as subdirectories listed
420 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
421 the form '1234567890abcdefghijklmnopqrstuv+123').
422
423 Note that this directory will appear empty until you attempt to access a
424 specific collection subdirectory (such as trying to 'cd' into it), at which
425 point the collection will actually be looked up on the server and the directory
426 will appear if it exists.
427 """.lstrip()
428
429     def __init__(self, parent_inode, inodes, api, num_retries):
430         super(MagicDirectory, self).__init__(parent_inode, inodes)
431         self.api = api
432         self.num_retries = num_retries
433
434     def __setattr__(self, name, value):
435         super(MagicDirectory, self).__setattr__(name, value)
436         # When we're assigned an inode, add a README.
437         if ((name == 'inode') and (self.inode is not None) and
438               (not self._entries)):
439             self._entries['README'] = self.inodes.add_entry(
440                 StringFile(self.inode, self.README_TEXT, time.time()))
441             # If we're the root directory, add an identical by_id subdirectory.
442             if self.inode == llfuse.ROOT_INODE:
443                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
444                         self.inode, self.inodes, self.api, self.num_retries))
445
446     def __contains__(self, k):
447         if k in self._entries:
448             return True
449
450         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
451             return False
452
453         try:
454             e = self.inodes.add_entry(CollectionDirectory(
455                     self.inode, self.inodes, self.api, self.num_retries, k))
456
457             if e.update():
458                 self._entries[k] = e
459                 return True
460             else:
461                 return False
462         except Exception as e:
463             _logger.debug('arv-mount exception keep %s', e)
464             return False
465
466     def __getitem__(self, item):
467         if item in self:
468             return self._entries[item]
469         else:
470             raise KeyError("No collection with id " + item)
471
472     def clear(self, force=False):
473         pass
474
475
476 class RecursiveInvalidateDirectory(Directory):
477     def invalidate(self):
478         try:
479             super(RecursiveInvalidateDirectory, self).invalidate()
480             for a in self._entries:
481                 self._entries[a].invalidate()
482         except Exception:
483             _logger.exception()
484
485
486 class TagsDirectory(RecursiveInvalidateDirectory):
487     """A special directory that contains as subdirectories all tags visible to the user."""
488
489     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
490         super(TagsDirectory, self).__init__(parent_inode, inodes)
491         self.api = api
492         self.num_retries = num_retries
493         self._poll = True
494         self._poll_time = poll_time
495
496     def update(self):
497         with llfuse.lock_released:
498             tags = self.api.links().list(
499                 filters=[['link_class', '=', 'tag']],
500                 select=['name'], distinct=True
501                 ).execute(num_retries=self.num_retries)
502         if "items" in tags:
503             self.merge(tags['items'],
504                        lambda i: i['name'],
505                        lambda a, i: a.tag == i['name'],
506                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
507
508
509 class TagDirectory(Directory):
510     """A special directory that contains as subdirectories all collections visible
511     to the user that are tagged with a particular tag.
512     """
513
514     def __init__(self, parent_inode, inodes, api, num_retries, tag,
515                  poll=False, poll_time=60):
516         super(TagDirectory, self).__init__(parent_inode, inodes)
517         self.api = api
518         self.num_retries = num_retries
519         self.tag = tag
520         self._poll = poll
521         self._poll_time = poll_time
522
523     def update(self):
524         with llfuse.lock_released:
525             taggedcollections = self.api.links().list(
526                 filters=[['link_class', '=', 'tag'],
527                          ['name', '=', self.tag],
528                          ['head_uuid', 'is_a', 'arvados#collection']],
529                 select=['head_uuid']
530                 ).execute(num_retries=self.num_retries)
531         self.merge(taggedcollections['items'],
532                    lambda i: i['head_uuid'],
533                    lambda a, i: a.collection_locator == i['head_uuid'],
534                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
535
536
537 class ProjectDirectory(Directory):
538     """A special directory that contains the contents of a project."""
539
540     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
541                  poll=False, poll_time=60):
542         super(ProjectDirectory, self).__init__(parent_inode, inodes)
543         self.api = api
544         self.num_retries = num_retries
545         self.project_object = project_object
546         self.project_object_file = None
547         self.project_uuid = project_object['uuid']
548         self._poll = poll
549         self._poll_time = poll_time
550         self._updating_lock = threading.Lock()
551
552     def createDirectory(self, i):
553         if collection_uuid_pattern.match(i['uuid']):
554             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
555         elif group_uuid_pattern.match(i['uuid']):
556             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
557         elif link_uuid_pattern.match(i['uuid']):
558             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
559                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
560             else:
561                 return None
562         elif uuid_pattern.match(i['uuid']):
563             return ObjectFile(self.parent_inode, i)
564         else:
565             return None
566
567     def uuid(self):
568         return self.project_uuid
569
570     def update(self):
571         if self.project_object_file == None:
572             self.project_object_file = ObjectFile(self.inode, self.project_object)
573             self.inodes.add_entry(self.project_object_file)
574
575         def namefn(i):
576             if 'name' in i:
577                 if i['name'] is None or len(i['name']) == 0:
578                     return None
579                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
580                     # collection or subproject
581                     return i['name']
582                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
583                     # name link
584                     return i['name']
585                 elif 'kind' in i and i['kind'].startswith('arvados#'):
586                     # something else
587                     return "{}.{}".format(i['name'], i['kind'][8:])
588             else:
589                 return None
590
591         def samefn(a, i):
592             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
593                 return a.uuid() == i['uuid']
594             elif isinstance(a, ObjectFile):
595                 return a.uuid() == i['uuid'] and not a.stale()
596             return False
597
598         try:
599             with llfuse.lock_released:
600                 self._updating_lock.acquire()
601                 if not self.stale():
602                     return
603
604                 if group_uuid_pattern.match(self.project_uuid):
605                     self.project_object = self.api.groups().get(
606                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
607                 elif user_uuid_pattern.match(self.project_uuid):
608                     self.project_object = self.api.users().get(
609                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
610
611                 contents = arvados.util.list_all(self.api.groups().contents,
612                                                  self.num_retries, uuid=self.project_uuid)
613
614             # end with llfuse.lock_released, re-acquire lock
615
616             self.merge(contents,
617                        namefn,
618                        samefn,
619                        self.createDirectory)
620         finally:
621             self._updating_lock.release()
622
623     def __getitem__(self, item):
624         self.checkupdate()
625         if item == '.arvados#project':
626             return self.project_object_file
627         else:
628             return super(ProjectDirectory, self).__getitem__(item)
629
630     def __contains__(self, k):
631         if k == '.arvados#project':
632             return True
633         else:
634             return super(ProjectDirectory, self).__contains__(k)
635
636     def persisted(self):
637         return True
638
639
640 class SharedDirectory(Directory):
641     """A special directory that represents users or groups who have shared projects with me."""
642
643     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
644                  poll=False, poll_time=60):
645         super(SharedDirectory, self).__init__(parent_inode, inodes)
646         self.api = api
647         self.num_retries = num_retries
648         self.current_user = api.users().current().execute(num_retries=num_retries)
649         self._poll = True
650         self._poll_time = poll_time
651
652     def update(self):
653         with llfuse.lock_released:
654             all_projects = arvados.util.list_all(
655                 self.api.groups().list, self.num_retries,
656                 filters=[['group_class','=','project']])
657             objects = {}
658             for ob in all_projects:
659                 objects[ob['uuid']] = ob
660
661             roots = []
662             root_owners = {}
663             for ob in all_projects:
664                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
665                     roots.append(ob)
666                     root_owners[ob['owner_uuid']] = True
667
668             lusers = arvados.util.list_all(
669                 self.api.users().list, self.num_retries,
670                 filters=[['uuid','in', list(root_owners)]])
671             lgroups = arvados.util.list_all(
672                 self.api.groups().list, self.num_retries,
673                 filters=[['uuid','in', list(root_owners)]])
674
675             users = {}
676             groups = {}
677
678             for l in lusers:
679                 objects[l["uuid"]] = l
680             for l in lgroups:
681                 objects[l["uuid"]] = l
682
683             contents = {}
684             for r in root_owners:
685                 if r in objects:
686                     obr = objects[r]
687                     if obr.get("name"):
688                         contents[obr["name"]] = obr
689                     #elif obr.get("username"):
690                     #    contents[obr["username"]] = obr
691                     elif "first_name" in obr:
692                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
693
694
695             for r in roots:
696                 if r['owner_uuid'] not in objects:
697                     contents[r['name']] = r
698
699         # end with llfuse.lock_released, re-acquire lock
700
701         try:
702             self.merge(contents.items(),
703                        lambda i: i[0],
704                        lambda a, i: a.uuid() == i[1]['uuid'],
705                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
706         except Exception:
707             _logger.exception()