3198: Can modify file, collection objects, changes are reflected in FUSE.
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8
9 from fusefile import StringFile, ObjectFile, FuseArvadosFile
10 from fresh import FreshBase, convertTime, use_counter
11
12 import arvados.collection
13 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
14
15 _logger = logging.getLogger('arvados.arvados_fuse')
16
17
18 # Match any character which FUSE or Linux cannot accommodate as part
19 # of a filename. (If present in a collection filename, they will
20 # appear as underscores in the fuse mount.)
21 _disallowed_filename_characters = re.compile('[\x00/]')
22
23 def sanitize_filename(dirty):
24     """Replace disallowed filename characters with harmless "_"."""
25     if dirty is None:
26         return None
27     elif dirty == '':
28         return '_'
29     elif dirty == '.':
30         return '_'
31     elif dirty == '..':
32         return '__'
33     else:
34         return _disallowed_filename_characters.sub('_', dirty)
35
36
37 class Directory(FreshBase):
38     """Generic directory object, backed by a dict.
39
40     Consists of a set of entries with the key representing the filename
41     and the value referencing a File or Directory object.
42     """
43
44     def __init__(self, parent_inode, inodes):
45         super(Directory, self).__init__()
46
47         """parent_inode is the integer inode number"""
48         self.inode = None
49         if not isinstance(parent_inode, int):
50             raise Exception("parent_inode should be an int")
51         self.parent_inode = parent_inode
52         self.inodes = inodes
53         self._entries = {}
54         self._mtime = time.time()
55
56     #  Overriden by subclasses to implement logic to update the entries dict
57     #  when the directory is stale
58     @use_counter
59     def update(self):
60         pass
61
62     # Only used when computing the size of the disk footprint of the directory
63     # (stub)
64     def size(self):
65         return 0
66
67     def persisted(self):
68         return False
69
70     def checkupdate(self):
71         if self.stale():
72             try:
73                 self.update()
74             except apiclient.errors.HttpError as e:
75                 _logger.debug(e)
76
77     @use_counter
78     def __getitem__(self, item):
79         self.checkupdate()
80         return self._entries[item]
81
82     @use_counter
83     def items(self):
84         self.checkupdate()
85         return list(self._entries.items())
86
87     @use_counter
88     def __contains__(self, k):
89         self.checkupdate()
90         return k in self._entries
91
92     def fresh(self):
93         self.inodes.touch(self)
94         super(Directory, self).fresh()
95
96     def merge(self, items, fn, same, new_entry):
97         """Helper method for updating the contents of the directory.
98
99         Takes a list describing the new contents of the directory, reuse
100         entries that are the same in both the old and new lists, create new
101         entries, and delete old entries missing from the new list.
102
103         :items: iterable with new directory contents
104
105         :fn: function to take an entry in 'items' and return the desired file or
106         directory name, or None if this entry should be skipped
107
108         :same: function to compare an existing entry (a File or Directory
109         object) with an entry in the items list to determine whether to keep
110         the existing entry.
111
112         :new_entry: function to create a new directory entry (File or Directory
113         object) from an entry in the items list.
114
115         """
116
117         oldentries = self._entries
118         self._entries = {}
119         changed = False
120         for i in items:
121             name = sanitize_filename(fn(i))
122             if name:
123                 if name in oldentries and same(oldentries[name], i):
124                     # move existing directory entry over
125                     self._entries[name] = oldentries[name]
126                     del oldentries[name]
127                 else:
128                     # create new directory entry
129                     ent = new_entry(i)
130                     if ent is not None:
131                         self._entries[name] = self.inodes.add_entry(ent)
132                         changed = True
133
134         # delete any other directory entries that were not in found in 'items'
135         for i in oldentries:
136             llfuse.invalidate_entry(self.inode, str(i))
137             self.inodes.del_entry(oldentries[i])
138             changed = True
139
140         if changed:
141             self._mtime = time.time()
142
143         self.fresh()
144
145     def clear(self, force=False):
146         """Delete all entries"""
147
148         if not self.in_use() or force:
149             oldentries = self._entries
150             self._entries = {}
151             for n in oldentries:
152                 if not oldentries[n].clear(force):
153                     self._entries = oldentries
154                     return False
155             for n in oldentries:
156                 llfuse.invalidate_entry(self.inode, str(n))
157                 self.inodes.del_entry(oldentries[n])
158             llfuse.invalidate_inode(self.inode)
159             self.invalidate()
160             return True
161         else:
162             return False
163
164     def mtime(self):
165         return self._mtime
166
167     def writable(self):
168         return False
169
170
171 class CollectionDirectoryBase(Directory):
172     def __init__(self, parent_inode, inodes, collection):
173         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
174         self.collection = collection
175
176     def new_entry(self, name, item, mtime):
177         name = sanitize_filename(name)
178         if isinstance(item, arvados.collection.RichCollectionBase):
179             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
180             self._entries[name].populate(mtime)
181         else:
182             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
183
184     def on_event(self, event, collection, name, item):
185         _logger.warn("Got event! %s %s %s %s", event, collection, name, item)
186         if collection == self.collection:
187             with llfuse.lock:
188                 if event == arvados.collection.ADD:
189                     self.new_entry(name, item, self.mtime())
190                 elif event == arvados.collection.DEL:
191                     ent = self._entries[name]
192                     llfuse.invalidate_entry(self.inode, name)
193                     self.inodes.del_entry(ent)
194                 elif event == arvados.collection.MOD:
195                     ent = self._entries[name]
196                     llfuse.invalidate_entry(self.inode, name)
197                     llfuse.invalidate_inode(ent.inode)
198
199     def populate(self, mtime):
200         self._mtime = mtime
201         self.collection.subscribe(self.on_event)
202         for entry, item in self.collection.items():
203             self.new_entry(entry, item, self.mtime())
204
205     def writable(self):
206         return self.collection.writable()
207
208
209 class CollectionDirectory(CollectionDirectoryBase):
210     """Represents the root of a directory tree holding a collection."""
211
212     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
213         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
214         self.api = api
215         self.num_retries = num_retries
216         self.collection_object_file = None
217         self.collection_object = None
218         if isinstance(collection_record, dict):
219             self.collection_locator = collection_record['uuid']
220             self._mtime = convertTime(collection_record.get('modified_at'))
221         else:
222             self.collection_locator = collection_record
223             self._mtime = 0
224         self._manifest_size = 0
225         if self.collection_locator:
226             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
227
228     def same(self, i):
229         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
230
231     def writable(self):
232         return self.collection.writable() if self.collection else self._writable
233
234     # Used by arv-web.py to switch the contents of the CollectionDirectory
235     def change_collection(self, new_locator):
236         """Switch the contents of the CollectionDirectory.
237
238         Must be called with llfuse.lock held.
239         """
240
241         self.collection_locator = new_locator
242         self.collection_object = None
243         self.update()
244
245     def new_collection(self, new_collection_object, coll_reader):
246         if self.inode:
247             self.clear(force=True)
248
249         self.collection_object = new_collection_object
250
251         if self.collection_object:
252             self._mtime = convertTime(self.collection_object.get('modified_at'))
253
254             if self.collection_object_file is not None:
255                 self.collection_object_file.update(self.collection_object)
256
257         self.collection = coll_reader
258         self.populate(self.mtime())
259
260     def update(self):
261         try:
262             if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
263                 return True
264
265             if self.collection_locator is None:
266                 self.fresh()
267                 return True
268
269             with llfuse.lock_released:
270                 if uuid_pattern.match(self.collection_locator):
271                     coll_reader = arvados.collection.Collection(
272                         self.collection_locator, self.api, self.api.keep,
273                         num_retries=self.num_retries)
274                 else:
275                     coll_reader = arvados.collection.CollectionReader(
276                         self.collection_locator, self.api, self.api.keep,
277                         num_retries=self.num_retries)
278                 new_collection_object = coll_reader.api_response() or {}
279                 # If the Collection only exists in Keep, there will be no API
280                 # response.  Fill in the fields we need.
281                 if 'uuid' not in new_collection_object:
282                     new_collection_object['uuid'] = self.collection_locator
283                 if "portable_data_hash" not in new_collection_object:
284                     new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
285                 if 'manifest_text' not in new_collection_object:
286                     new_collection_object['manifest_text'] = coll_reader.manifest_text()
287             # end with llfuse.lock_released, re-acquire lock
288
289             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
290                 self.new_collection(new_collection_object, coll_reader)
291
292             self._manifest_size = len(coll_reader.manifest_text())
293             _logger.debug("%s manifest_size %i", self, self._manifest_size)
294
295             self.fresh()
296             return True
297         except arvados.errors.NotFoundError:
298             _logger.exception("arv-mount %s: error", self.collection_locator)
299         except arvados.errors.ArgumentError as detail:
300             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
301             if self.collection_object is not None and "manifest_text" in self.collection_object:
302                 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
303         except Exception:
304             _logger.exception("arv-mount %s: error", self.collection_locator)
305             if self.collection_object is not None and "manifest_text" in self.collection_object:
306                 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
307         return False
308
309     def __getitem__(self, item):
310         self.checkupdate()
311         if item == '.arvados#collection':
312             if self.collection_object_file is None:
313                 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
314                 self.inodes.add_entry(self.collection_object_file)
315             return self.collection_object_file
316         else:
317             return super(CollectionDirectory, self).__getitem__(item)
318
319     def __contains__(self, k):
320         if k == '.arvados#collection':
321             return True
322         else:
323             return super(CollectionDirectory, self).__contains__(k)
324
325     def invalidate(self):
326         self.collection_object = None
327         self.collection_object_file = None
328         super(CollectionDirectory, self).invalidate()
329
330     def persisted(self):
331         return (self.collection_locator is not None)
332
333     def objsize(self):
334         return self._manifest_size * 128
335
336 class MagicDirectory(Directory):
337     """A special directory that logically contains the set of all extant keep locators.
338
339     When a file is referenced by lookup(), it is tested to see if it is a valid
340     keep locator to a manifest, and if so, loads the manifest contents as a
341     subdirectory of this directory with the locator as the directory name.
342     Since querying a list of all extant keep locators is impractical, only
343     collections that have already been accessed are visible to readdir().
344
345     """
346
347     README_TEXT = """
348 This directory provides access to Arvados collections as subdirectories listed
349 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
350 the form '1234567890abcdefghijklmnopqrstuv+123').
351
352 Note that this directory will appear empty until you attempt to access a
353 specific collection subdirectory (such as trying to 'cd' into it), at which
354 point the collection will actually be looked up on the server and the directory
355 will appear if it exists.
356 """.lstrip()
357
358     def __init__(self, parent_inode, inodes, api, num_retries):
359         super(MagicDirectory, self).__init__(parent_inode, inodes)
360         self.api = api
361         self.num_retries = num_retries
362
363     def __setattr__(self, name, value):
364         super(MagicDirectory, self).__setattr__(name, value)
365         # When we're assigned an inode, add a README.
366         if ((name == 'inode') and (self.inode is not None) and
367               (not self._entries)):
368             self._entries['README'] = self.inodes.add_entry(
369                 StringFile(self.inode, self.README_TEXT, time.time()))
370             # If we're the root directory, add an identical by_id subdirectory.
371             if self.inode == llfuse.ROOT_INODE:
372                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
373                         self.inode, self.inodes, self.api, self.num_retries))
374
375     def __contains__(self, k):
376         if k in self._entries:
377             return True
378
379         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
380             return False
381
382         try:
383             e = self.inodes.add_entry(CollectionDirectory(
384                     self.inode, self.inodes, self.api, self.num_retries, k))
385
386             if e.update():
387                 self._entries[k] = e
388                 return True
389             else:
390                 return False
391         except Exception as e:
392             _logger.debug('arv-mount exception keep %s', e)
393             return False
394
395     def __getitem__(self, item):
396         if item in self:
397             return self._entries[item]
398         else:
399             raise KeyError("No collection with id " + item)
400
401     def clear(self, force=False):
402         pass
403
404
405 class RecursiveInvalidateDirectory(Directory):
406     def invalidate(self):
407         try:
408             super(RecursiveInvalidateDirectory, self).invalidate()
409             for a in self._entries:
410                 self._entries[a].invalidate()
411         except Exception:
412             _logger.exception()
413
414
415 class TagsDirectory(RecursiveInvalidateDirectory):
416     """A special directory that contains as subdirectories all tags visible to the user."""
417
418     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
419         super(TagsDirectory, self).__init__(parent_inode, inodes)
420         self.api = api
421         self.num_retries = num_retries
422         self._poll = True
423         self._poll_time = poll_time
424
425     def update(self):
426         with llfuse.lock_released:
427             tags = self.api.links().list(
428                 filters=[['link_class', '=', 'tag']],
429                 select=['name'], distinct=True
430                 ).execute(num_retries=self.num_retries)
431         if "items" in tags:
432             self.merge(tags['items'],
433                        lambda i: i['name'],
434                        lambda a, i: a.tag == i['name'],
435                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
436
437
438 class TagDirectory(Directory):
439     """A special directory that contains as subdirectories all collections visible
440     to the user that are tagged with a particular tag.
441     """
442
443     def __init__(self, parent_inode, inodes, api, num_retries, tag,
444                  poll=False, poll_time=60):
445         super(TagDirectory, self).__init__(parent_inode, inodes)
446         self.api = api
447         self.num_retries = num_retries
448         self.tag = tag
449         self._poll = poll
450         self._poll_time = poll_time
451
452     def update(self):
453         with llfuse.lock_released:
454             taggedcollections = self.api.links().list(
455                 filters=[['link_class', '=', 'tag'],
456                          ['name', '=', self.tag],
457                          ['head_uuid', 'is_a', 'arvados#collection']],
458                 select=['head_uuid']
459                 ).execute(num_retries=self.num_retries)
460         self.merge(taggedcollections['items'],
461                    lambda i: i['head_uuid'],
462                    lambda a, i: a.collection_locator == i['head_uuid'],
463                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
464
465
466 class ProjectDirectory(Directory):
467     """A special directory that contains the contents of a project."""
468
469     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
470                  poll=False, poll_time=60):
471         super(ProjectDirectory, self).__init__(parent_inode, inodes)
472         self.api = api
473         self.num_retries = num_retries
474         self.project_object = project_object
475         self.project_object_file = None
476         self.uuid = project_object['uuid']
477         self._poll = poll
478         self._poll_time = poll_time
479
480     def createDirectory(self, i):
481         if collection_uuid_pattern.match(i['uuid']):
482             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
483         elif group_uuid_pattern.match(i['uuid']):
484             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
485         elif link_uuid_pattern.match(i['uuid']):
486             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
487                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
488             else:
489                 return None
490         elif uuid_pattern.match(i['uuid']):
491             return ObjectFile(self.parent_inode, i)
492         else:
493             return None
494
495     def update(self):
496         if self.project_object_file == None:
497             self.project_object_file = ObjectFile(self.inode, self.project_object)
498             self.inodes.add_entry(self.project_object_file)
499
500         def namefn(i):
501             if 'name' in i:
502                 if i['name'] is None or len(i['name']) == 0:
503                     return None
504                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
505                     # collection or subproject
506                     return i['name']
507                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
508                     # name link
509                     return i['name']
510                 elif 'kind' in i and i['kind'].startswith('arvados#'):
511                     # something else
512                     return "{}.{}".format(i['name'], i['kind'][8:])
513             else:
514                 return None
515
516         def samefn(a, i):
517             if isinstance(a, CollectionDirectory):
518                 return a.collection_locator == i['uuid']
519             elif isinstance(a, ProjectDirectory):
520                 return a.uuid == i['uuid']
521             elif isinstance(a, ObjectFile):
522                 return a.uuid == i['uuid'] and not a.stale()
523             return False
524
525         with llfuse.lock_released:
526             if group_uuid_pattern.match(self.uuid):
527                 self.project_object = self.api.groups().get(
528                     uuid=self.uuid).execute(num_retries=self.num_retries)
529             elif user_uuid_pattern.match(self.uuid):
530                 self.project_object = self.api.users().get(
531                     uuid=self.uuid).execute(num_retries=self.num_retries)
532
533             contents = arvados.util.list_all(self.api.groups().contents,
534                                              self.num_retries, uuid=self.uuid)
535
536         # end with llfuse.lock_released, re-acquire lock
537
538         self.merge(contents,
539                    namefn,
540                    samefn,
541                    self.createDirectory)
542
543     def __getitem__(self, item):
544         self.checkupdate()
545         if item == '.arvados#project':
546             return self.project_object_file
547         else:
548             return super(ProjectDirectory, self).__getitem__(item)
549
550     def __contains__(self, k):
551         if k == '.arvados#project':
552             return True
553         else:
554             return super(ProjectDirectory, self).__contains__(k)
555
556     def persisted(self):
557         return False
558
559     def objsize(self):
560         return len(self.project_object) * 1024 if self.project_object else 0
561
562 class SharedDirectory(Directory):
563     """A special directory that represents users or groups who have shared projects with me."""
564
565     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
566                  poll=False, poll_time=60):
567         super(SharedDirectory, self).__init__(parent_inode, inodes)
568         self.api = api
569         self.num_retries = num_retries
570         self.current_user = api.users().current().execute(num_retries=num_retries)
571         self._poll = True
572         self._poll_time = poll_time
573
574     def update(self):
575         with llfuse.lock_released:
576             all_projects = arvados.util.list_all(
577                 self.api.groups().list, self.num_retries,
578                 filters=[['group_class','=','project']])
579             objects = {}
580             for ob in all_projects:
581                 objects[ob['uuid']] = ob
582
583             roots = []
584             root_owners = {}
585             for ob in all_projects:
586                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
587                     roots.append(ob)
588                     root_owners[ob['owner_uuid']] = True
589
590             lusers = arvados.util.list_all(
591                 self.api.users().list, self.num_retries,
592                 filters=[['uuid','in', list(root_owners)]])
593             lgroups = arvados.util.list_all(
594                 self.api.groups().list, self.num_retries,
595                 filters=[['uuid','in', list(root_owners)]])
596
597             users = {}
598             groups = {}
599
600             for l in lusers:
601                 objects[l["uuid"]] = l
602             for l in lgroups:
603                 objects[l["uuid"]] = l
604
605             contents = {}
606             for r in root_owners:
607                 if r in objects:
608                     obr = objects[r]
609                     if "name" in obr:
610                         contents[obr["name"]] = obr
611                     if "first_name" in obr:
612                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
613
614             for r in roots:
615                 if r['owner_uuid'] not in objects:
616                     contents[r['name']] = r
617
618         # end with llfuse.lock_released, re-acquire lock
619
620         try:
621             self.merge(contents.items(),
622                        lambda i: i[0],
623                        lambda a, i: a.uuid == i[1]['uuid'],
624                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
625         except Exception:
626             _logger.exception()