Merge branch 'master' into 3198-writable-fuse
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8
9 from fusefile import StringFile, ObjectFile, FuseArvadosFile
10 from fresh import FreshBase, convertTime, use_counter
11
12 import arvados.collection
13 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
14
15 _logger = logging.getLogger('arvados.arvados_fuse')
16
17
18 # Match any character which FUSE or Linux cannot accommodate as part
19 # of a filename. (If present in a collection filename, they will
20 # appear as underscores in the fuse mount.)
21 _disallowed_filename_characters = re.compile('[\x00/]')
22
23 def sanitize_filename(dirty):
24     """Replace disallowed filename characters with harmless "_"."""
25     if dirty is None:
26         return None
27     elif dirty == '':
28         return '_'
29     elif dirty == '.':
30         return '_'
31     elif dirty == '..':
32         return '__'
33     else:
34         return _disallowed_filename_characters.sub('_', dirty)
35
36
37 class Directory(FreshBase):
38     """Generic directory object, backed by a dict.
39
40     Consists of a set of entries with the key representing the filename
41     and the value referencing a File or Directory object.
42     """
43
44     def __init__(self, parent_inode, inodes):
45         super(Directory, self).__init__()
46
47         """parent_inode is the integer inode number"""
48         self.inode = None
49         if not isinstance(parent_inode, int):
50             raise Exception("parent_inode should be an int")
51         self.parent_inode = parent_inode
52         self.inodes = inodes
53         self._entries = {}
54         self._mtime = time.time()
55
56     #  Overriden by subclasses to implement logic to update the entries dict
57     #  when the directory is stale
58     @use_counter
59     def update(self):
60         pass
61
62     # Only used when computing the size of the disk footprint of the directory
63     # (stub)
64     def size(self):
65         return 0
66
67     def persisted(self):
68         return False
69
70     def checkupdate(self):
71         if self.stale():
72             try:
73                 self.update()
74             except apiclient.errors.HttpError as e:
75                 _logger.debug(e)
76
77     @use_counter
78     def __getitem__(self, item):
79         self.checkupdate()
80         return self._entries[item]
81
82     @use_counter
83     def items(self):
84         self.checkupdate()
85         return list(self._entries.items())
86
87     @use_counter
88     def __contains__(self, k):
89         self.checkupdate()
90         return k in self._entries
91
92     def fresh(self):
93         self.inodes.touch(self)
94         super(Directory, self).fresh()
95
96     def merge(self, items, fn, same, new_entry):
97         """Helper method for updating the contents of the directory.
98
99         Takes a list describing the new contents of the directory, reuse
100         entries that are the same in both the old and new lists, create new
101         entries, and delete old entries missing from the new list.
102
103         :items: iterable with new directory contents
104
105         :fn: function to take an entry in 'items' and return the desired file or
106         directory name, or None if this entry should be skipped
107
108         :same: function to compare an existing entry (a File or Directory
109         object) with an entry in the items list to determine whether to keep
110         the existing entry.
111
112         :new_entry: function to create a new directory entry (File or Directory
113         object) from an entry in the items list.
114
115         """
116
117         oldentries = self._entries
118         self._entries = {}
119         changed = False
120         for i in items:
121             name = sanitize_filename(fn(i))
122             if name:
123                 if name in oldentries and same(oldentries[name], i):
124                     # move existing directory entry over
125                     self._entries[name] = oldentries[name]
126                     del oldentries[name]
127                 else:
128                     # create new directory entry
129                     ent = new_entry(i)
130                     if ent is not None:
131                         self._entries[name] = self.inodes.add_entry(ent)
132                         changed = True
133
134         # delete any other directory entries that were not in found in 'items'
135         for i in oldentries:
136             llfuse.invalidate_entry(self.inode, str(i))
137             self.inodes.del_entry(oldentries[i])
138             changed = True
139
140         if changed:
141             llfuse.invalidate_inode(self.inode)
142             self._mtime = time.time()
143
144         self.fresh()
145
146     def clear(self, force=False):
147         """Delete all entries"""
148
149         if not self.in_use() or force:
150             oldentries = self._entries
151             self._entries = {}
152             for n in oldentries:
153                 if not oldentries[n].clear(force):
154                     self._entries = oldentries
155                     return False
156             for n in oldentries:
157                 llfuse.invalidate_entry(self.inode, str(n))
158                 self.inodes.del_entry(oldentries[n])
159             llfuse.invalidate_inode(self.inode)
160             self.invalidate()
161             return True
162         else:
163             return False
164
165     def mtime(self):
166         return self._mtime
167
168     def writable(self):
169         return False
170
171     def flush(self):
172         pass
173
174 class CollectionDirectoryBase(Directory):
175     def __init__(self, parent_inode, inodes, collection):
176         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
177         self.collection = collection
178
179     def new_entry(self, name, item, mtime):
180         name = sanitize_filename(name)
181         if isinstance(item, arvados.collection.RichCollectionBase):
182             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
183             self._entries[name].populate(mtime)
184         else:
185             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
186
187     def on_event(self, event, collection, name, item):
188         _logger.warn("Got event! %s %s %s %s", event, collection, name, item)
189         if collection == self.collection:
190             with llfuse.lock:
191                 if event == arvados.collection.ADD:
192                     self.new_entry(name, item, self.mtime())
193                 elif event == arvados.collection.DEL:
194                     ent = self._entries[name]
195                     del self._entries[name]
196                     llfuse.invalidate_entry(self.inode, name)
197                     self.inodes.del_entry(ent)
198                 elif event == arvados.collection.MOD:
199                     ent = self._entries[name]
200                     llfuse.invalidate_inode(ent.inode)
201         _logger.warn("Finished handling event")
202
203     def populate(self, mtime):
204         self._mtime = mtime
205         self.collection.subscribe(self.on_event)
206         for entry, item in self.collection.items():
207             self.new_entry(entry, item, self.mtime())
208
209     def writable(self):
210         return self.collection.writable()
211
212     def flush(self):
213         self.collection.root_collection().save()
214
215 class CollectionDirectory(CollectionDirectoryBase):
216     """Represents the root of a directory tree holding a collection."""
217
218     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
219         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
220         self.api = api
221         self.num_retries = num_retries
222         self.collection_object_file = None
223         self.collection_object = None
224         if isinstance(collection_record, dict):
225             self.collection_locator = collection_record['uuid']
226             self._mtime = convertTime(collection_record.get('modified_at'))
227         else:
228             self.collection_locator = collection_record
229             self._mtime = 0
230         self._manifest_size = 0
231         if self.collection_locator:
232             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
233
234     def same(self, i):
235         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
236
237     def writable(self):
238         return self.collection.writable() if self.collection is not None else self._writable
239
240     # Used by arv-web.py to switch the contents of the CollectionDirectory
241     def change_collection(self, new_locator):
242         """Switch the contents of the CollectionDirectory.
243
244         Must be called with llfuse.lock held.
245         """
246
247         self.collection_locator = new_locator
248         self.collection_object = None
249         self.update()
250
251     def new_collection(self, new_collection_object, coll_reader):
252         if self.inode:
253             self.clear(force=True)
254
255         self.collection_object = new_collection_object
256
257         if self.collection_object:
258             self._mtime = convertTime(self.collection_object.get('modified_at'))
259
260             if self.collection_object_file is not None:
261                 self.collection_object_file.update(self.collection_object)
262
263         self.collection = coll_reader
264         self.populate(self.mtime())
265
266     def update(self):
267         try:
268             if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
269                 return True
270
271             if self.collection_locator is None:
272                 self.fresh()
273                 return True
274
275             with llfuse.lock_released:
276                 if uuid_pattern.match(self.collection_locator):
277                     coll_reader = arvados.collection.Collection(
278                         self.collection_locator, self.api, self.api.keep,
279                         num_retries=self.num_retries)
280                 else:
281                     coll_reader = arvados.collection.CollectionReader(
282                         self.collection_locator, self.api, self.api.keep,
283                         num_retries=self.num_retries)
284                 new_collection_object = coll_reader.api_response() or {}
285                 # If the Collection only exists in Keep, there will be no API
286                 # response.  Fill in the fields we need.
287                 if 'uuid' not in new_collection_object:
288                     new_collection_object['uuid'] = self.collection_locator
289                 if "portable_data_hash" not in new_collection_object:
290                     new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
291                 if 'manifest_text' not in new_collection_object:
292                     new_collection_object['manifest_text'] = coll_reader.manifest_text()
293             # end with llfuse.lock_released, re-acquire lock
294
295             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
296                 self.new_collection(new_collection_object, coll_reader)
297
298             self._manifest_size = len(coll_reader.manifest_text())
299             _logger.debug("%s manifest_size %i", self, self._manifest_size)
300
301             self.fresh()
302             return True
303         except arvados.errors.NotFoundError:
304             _logger.exception("arv-mount %s: error", self.collection_locator)
305         except arvados.errors.ArgumentError as detail:
306             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
307             if self.collection_object is not None and "manifest_text" in self.collection_object:
308                 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
309         except Exception:
310             _logger.exception("arv-mount %s: error", self.collection_locator)
311             if self.collection_object is not None and "manifest_text" in self.collection_object:
312                 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
313         return False
314
315     def __getitem__(self, item):
316         self.checkupdate()
317         if item == '.arvados#collection':
318             if self.collection_object_file is None:
319                 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
320                 self.inodes.add_entry(self.collection_object_file)
321             return self.collection_object_file
322         else:
323             return super(CollectionDirectory, self).__getitem__(item)
324
325     def __contains__(self, k):
326         if k == '.arvados#collection':
327             return True
328         else:
329             return super(CollectionDirectory, self).__contains__(k)
330
331     def invalidate(self):
332         self.collection_object = None
333         self.collection_object_file = None
334         super(CollectionDirectory, self).invalidate()
335
336     def persisted(self):
337         return (self.collection_locator is not None)
338
339     def objsize(self):
340         return self._manifest_size * 128
341
342 class MagicDirectory(Directory):
343     """A special directory that logically contains the set of all extant keep locators.
344
345     When a file is referenced by lookup(), it is tested to see if it is a valid
346     keep locator to a manifest, and if so, loads the manifest contents as a
347     subdirectory of this directory with the locator as the directory name.
348     Since querying a list of all extant keep locators is impractical, only
349     collections that have already been accessed are visible to readdir().
350
351     """
352
353     README_TEXT = """
354 This directory provides access to Arvados collections as subdirectories listed
355 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
356 the form '1234567890abcdefghijklmnopqrstuv+123').
357
358 Note that this directory will appear empty until you attempt to access a
359 specific collection subdirectory (such as trying to 'cd' into it), at which
360 point the collection will actually be looked up on the server and the directory
361 will appear if it exists.
362 """.lstrip()
363
364     def __init__(self, parent_inode, inodes, api, num_retries):
365         super(MagicDirectory, self).__init__(parent_inode, inodes)
366         self.api = api
367         self.num_retries = num_retries
368
369     def __setattr__(self, name, value):
370         super(MagicDirectory, self).__setattr__(name, value)
371         # When we're assigned an inode, add a README.
372         if ((name == 'inode') and (self.inode is not None) and
373               (not self._entries)):
374             self._entries['README'] = self.inodes.add_entry(
375                 StringFile(self.inode, self.README_TEXT, time.time()))
376             # If we're the root directory, add an identical by_id subdirectory.
377             if self.inode == llfuse.ROOT_INODE:
378                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
379                         self.inode, self.inodes, self.api, self.num_retries))
380
381     def __contains__(self, k):
382         if k in self._entries:
383             return True
384
385         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
386             return False
387
388         try:
389             e = self.inodes.add_entry(CollectionDirectory(
390                     self.inode, self.inodes, self.api, self.num_retries, k))
391
392             if e.update():
393                 self._entries[k] = e
394                 return True
395             else:
396                 return False
397         except Exception as e:
398             _logger.debug('arv-mount exception keep %s', e)
399             return False
400
401     def __getitem__(self, item):
402         if item in self:
403             return self._entries[item]
404         else:
405             raise KeyError("No collection with id " + item)
406
407     def clear(self, force=False):
408         pass
409
410
411 class RecursiveInvalidateDirectory(Directory):
412     def invalidate(self):
413         try:
414             super(RecursiveInvalidateDirectory, self).invalidate()
415             for a in self._entries:
416                 self._entries[a].invalidate()
417         except Exception:
418             _logger.exception()
419
420
421 class TagsDirectory(RecursiveInvalidateDirectory):
422     """A special directory that contains as subdirectories all tags visible to the user."""
423
424     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
425         super(TagsDirectory, self).__init__(parent_inode, inodes)
426         self.api = api
427         self.num_retries = num_retries
428         self._poll = True
429         self._poll_time = poll_time
430
431     def update(self):
432         with llfuse.lock_released:
433             tags = self.api.links().list(
434                 filters=[['link_class', '=', 'tag']],
435                 select=['name'], distinct=True
436                 ).execute(num_retries=self.num_retries)
437         if "items" in tags:
438             self.merge(tags['items'],
439                        lambda i: i['name'],
440                        lambda a, i: a.tag == i['name'],
441                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
442
443
444 class TagDirectory(Directory):
445     """A special directory that contains as subdirectories all collections visible
446     to the user that are tagged with a particular tag.
447     """
448
449     def __init__(self, parent_inode, inodes, api, num_retries, tag,
450                  poll=False, poll_time=60):
451         super(TagDirectory, self).__init__(parent_inode, inodes)
452         self.api = api
453         self.num_retries = num_retries
454         self.tag = tag
455         self._poll = poll
456         self._poll_time = poll_time
457
458     def update(self):
459         with llfuse.lock_released:
460             taggedcollections = self.api.links().list(
461                 filters=[['link_class', '=', 'tag'],
462                          ['name', '=', self.tag],
463                          ['head_uuid', 'is_a', 'arvados#collection']],
464                 select=['head_uuid']
465                 ).execute(num_retries=self.num_retries)
466         self.merge(taggedcollections['items'],
467                    lambda i: i['head_uuid'],
468                    lambda a, i: a.collection_locator == i['head_uuid'],
469                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
470
471
472 class ProjectDirectory(Directory):
473     """A special directory that contains the contents of a project."""
474
475     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
476                  poll=False, poll_time=60):
477         super(ProjectDirectory, self).__init__(parent_inode, inodes)
478         self.api = api
479         self.num_retries = num_retries
480         self.project_object = project_object
481         self.project_object_file = None
482         self.uuid = project_object['uuid']
483         self._poll = poll
484         self._poll_time = poll_time
485
486     def createDirectory(self, i):
487         if collection_uuid_pattern.match(i['uuid']):
488             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
489         elif group_uuid_pattern.match(i['uuid']):
490             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
491         elif link_uuid_pattern.match(i['uuid']):
492             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
493                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
494             else:
495                 return None
496         elif uuid_pattern.match(i['uuid']):
497             return ObjectFile(self.parent_inode, i)
498         else:
499             return None
500
501     def update(self):
502         if self.project_object_file == None:
503             self.project_object_file = ObjectFile(self.inode, self.project_object)
504             self.inodes.add_entry(self.project_object_file)
505
506         def namefn(i):
507             if 'name' in i:
508                 if i['name'] is None or len(i['name']) == 0:
509                     return None
510                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
511                     # collection or subproject
512                     return i['name']
513                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
514                     # name link
515                     return i['name']
516                 elif 'kind' in i and i['kind'].startswith('arvados#'):
517                     # something else
518                     return "{}.{}".format(i['name'], i['kind'][8:])
519             else:
520                 return None
521
522         def samefn(a, i):
523             if isinstance(a, CollectionDirectory):
524                 return a.collection_locator == i['uuid']
525             elif isinstance(a, ProjectDirectory):
526                 return a.uuid == i['uuid']
527             elif isinstance(a, ObjectFile):
528                 return a.uuid == i['uuid'] and not a.stale()
529             return False
530
531         with llfuse.lock_released:
532             if group_uuid_pattern.match(self.uuid):
533                 self.project_object = self.api.groups().get(
534                     uuid=self.uuid).execute(num_retries=self.num_retries)
535             elif user_uuid_pattern.match(self.uuid):
536                 self.project_object = self.api.users().get(
537                     uuid=self.uuid).execute(num_retries=self.num_retries)
538
539             contents = arvados.util.list_all(self.api.groups().contents,
540                                              self.num_retries, uuid=self.uuid)
541
542         # end with llfuse.lock_released, re-acquire lock
543
544         self.merge(contents,
545                    namefn,
546                    samefn,
547                    self.createDirectory)
548
549     def __getitem__(self, item):
550         self.checkupdate()
551         if item == '.arvados#project':
552             return self.project_object_file
553         else:
554             return super(ProjectDirectory, self).__getitem__(item)
555
556     def __contains__(self, k):
557         if k == '.arvados#project':
558             return True
559         else:
560             return super(ProjectDirectory, self).__contains__(k)
561
562     def persisted(self):
563         return False
564
565     def objsize(self):
566         return len(self.project_object) * 1024 if self.project_object else 0
567
568 class SharedDirectory(Directory):
569     """A special directory that represents users or groups who have shared projects with me."""
570
571     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
572                  poll=False, poll_time=60):
573         super(SharedDirectory, self).__init__(parent_inode, inodes)
574         self.api = api
575         self.num_retries = num_retries
576         self.current_user = api.users().current().execute(num_retries=num_retries)
577         self._poll = True
578         self._poll_time = poll_time
579
580     def update(self):
581         with llfuse.lock_released:
582             all_projects = arvados.util.list_all(
583                 self.api.groups().list, self.num_retries,
584                 filters=[['group_class','=','project']])
585             objects = {}
586             for ob in all_projects:
587                 objects[ob['uuid']] = ob
588
589             roots = []
590             root_owners = {}
591             for ob in all_projects:
592                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
593                     roots.append(ob)
594                     root_owners[ob['owner_uuid']] = True
595
596             lusers = arvados.util.list_all(
597                 self.api.users().list, self.num_retries,
598                 filters=[['uuid','in', list(root_owners)]])
599             lgroups = arvados.util.list_all(
600                 self.api.groups().list, self.num_retries,
601                 filters=[['uuid','in', list(root_owners)]])
602
603             users = {}
604             groups = {}
605
606             for l in lusers:
607                 objects[l["uuid"]] = l
608             for l in lgroups:
609                 objects[l["uuid"]] = l
610
611             contents = {}
612             for r in root_owners:
613                 if r in objects:
614                     obr = objects[r]
615                     if "name" in obr:
616                         contents[obr["name"]] = obr
617                     if "first_name" in obr:
618                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
619
620             for r in roots:
621                 if r['owner_uuid'] not in objects:
622                     contents[r['name']] = r
623
624         # end with llfuse.lock_released, re-acquire lock
625
626         try:
627             self.merge(contents.items(),
628                        lambda i: i[0],
629                        lambda a, i: a.uuid == i[1]['uuid'],
630                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
631         except Exception:
632             _logger.exception()