11446220b0d43fe3e023668244a0d86fd413e601
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7
8 from fusefile import StringFile, StreamReaderFile, ObjectFile
9 from fresh import FreshBase, convertTime
10
11 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
12
13 _logger = logging.getLogger('arvados.arvados_fuse')
14
15
16 # Match any character which FUSE or Linux cannot accommodate as part
17 # of a filename. (If present in a collection filename, they will
18 # appear as underscores in the fuse mount.)
19 _disallowed_filename_characters = re.compile('[\x00/]')
20
21 def sanitize_filename(dirty):
22     """Replace disallowed filename characters with harmless "_"."""
23     if dirty is None:
24         return None
25     elif dirty == '':
26         return '_'
27     elif dirty == '.':
28         return '_'
29     elif dirty == '..':
30         return '__'
31     else:
32         return _disallowed_filename_characters.sub('_', dirty)
33
34
35 class Directory(FreshBase):
36     """Generic directory object, backed by a dict.
37
38     Consists of a set of entries with the key representing the filename
39     and the value referencing a File or Directory object.
40     """
41
42     def __init__(self, parent_inode):
43         super(Directory, self).__init__()
44
45         """parent_inode is the integer inode number"""
46         self.inode = None
47         if not isinstance(parent_inode, int):
48             raise Exception("parent_inode should be an int")
49         self.parent_inode = parent_inode
50         self._entries = {}
51         self._mtime = time.time()
52
53     #  Overriden by subclasses to implement logic to update the entries dict
54     #  when the directory is stale
55     def update(self):
56         pass
57
58     # Only used when computing the size of the disk footprint of the directory
59     # (stub)
60     def size(self):
61         return 0
62
63     def checkupdate(self):
64         if self.stale():
65             try:
66                 self.update()
67             except apiclient.errors.HttpError as e:
68                 _logger.debug(e)
69
70     def __getitem__(self, item):
71         self.checkupdate()
72         return self._entries[item]
73
74     def items(self):
75         self.checkupdate()
76         return self._entries.items()
77
78     def __iter__(self):
79         self.checkupdate()
80         return self._entries.iterkeys()
81
82     def __contains__(self, k):
83         self.checkupdate()
84         return k in self._entries
85
86     def merge(self, items, fn, same, new_entry):
87         """Helper method for updating the contents of the directory.
88
89         Takes a list describing the new contents of the directory, reuse
90         entries that are the same in both the old and new lists, create new
91         entries, and delete old entries missing from the new list.
92
93         :items: iterable with new directory contents
94
95         :fn: function to take an entry in 'items' and return the desired file or
96         directory name, or None if this entry should be skipped
97
98         :same: function to compare an existing entry (a File or Directory
99         object) with an entry in the items list to determine whether to keep
100         the existing entry.
101
102         :new_entry: function to create a new directory entry (File or Directory
103         object) from an entry in the items list.
104
105         """
106
107         oldentries = self._entries
108         self._entries = {}
109         changed = False
110         for i in items:
111             name = sanitize_filename(fn(i))
112             if name:
113                 if name in oldentries and same(oldentries[name], i):
114                     # move existing directory entry over
115                     self._entries[name] = oldentries[name]
116                     del oldentries[name]
117                 else:
118                     # create new directory entry
119                     ent = new_entry(i)
120                     if ent is not None:
121                         self._entries[name] = self.inodes.add_entry(ent)
122                         changed = True
123
124         # delete any other directory entries that were not in found in 'items'
125         for i in oldentries:
126             llfuse.invalidate_entry(self.inode, str(i))
127             self.inodes.del_entry(oldentries[i])
128             changed = True
129
130         if changed:
131             self._mtime = time.time()
132
133         self.fresh()
134
135     def clear(self):
136         """Delete all entries"""
137         oldentries = self._entries
138         self._entries = {}
139         for n in oldentries:
140             if isinstance(n, Directory):
141                 n.clear()
142             llfuse.invalidate_entry(self.inode, str(n))
143             self.inodes.del_entry(oldentries[n])
144         llfuse.invalidate_inode(self.inode)
145         self.invalidate()
146
147     def mtime(self):
148         return self._mtime
149
150
151 class CollectionDirectory(Directory):
152     """Represents the root of a directory tree holding a collection."""
153
154     def __init__(self, parent_inode, inodes, api, num_retries, collection):
155         super(CollectionDirectory, self).__init__(parent_inode)
156         self.inodes = inodes
157         self.api = api
158         self.num_retries = num_retries
159         self.collection_object_file = None
160         self.collection_object = None
161         if isinstance(collection, dict):
162             self.collection_locator = collection['uuid']
163             self._mtime = convertTime(collection.get('modified_at'))
164         else:
165             self.collection_locator = collection
166             self._mtime = 0
167
168     def same(self, i):
169         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
170
171     # Used by arv-web.py to switch the contents of the CollectionDirectory
172     def change_collection(self, new_locator):
173         """Switch the contents of the CollectionDirectory.
174
175         Must be called with llfuse.lock held.
176         """
177
178         self.collection_locator = new_locator
179         self.collection_object = None
180         self.update()
181
182     def new_collection(self, new_collection_object, coll_reader):
183         self.collection_object = new_collection_object
184
185         self._mtime = convertTime(self.collection_object.get('modified_at'))
186
187         if self.collection_object_file is not None:
188             self.collection_object_file.update(self.collection_object)
189
190         self.clear()
191         for s in coll_reader.all_streams():
192             cwd = self
193             for part in s.name().split('/'):
194                 if part != '' and part != '.':
195                     partname = sanitize_filename(part)
196                     if partname not in cwd._entries:
197                         cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
198                     cwd = cwd._entries[partname]
199             for k, v in s.files().items():
200                 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
201
202     def update(self):
203         try:
204             if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
205                 return True
206
207             if self.collection_locator is None:
208                 self.fresh()
209                 return True
210
211             with llfuse.lock_released:
212                 coll_reader = arvados.CollectionReader(
213                     self.collection_locator, self.api, self.api.keep,
214                     num_retries=self.num_retries)
215                 new_collection_object = coll_reader.api_response() or {}
216                 # If the Collection only exists in Keep, there will be no API
217                 # response.  Fill in the fields we need.
218                 if 'uuid' not in new_collection_object:
219                     new_collection_object['uuid'] = self.collection_locator
220                 if "portable_data_hash" not in new_collection_object:
221                     new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
222                 if 'manifest_text' not in new_collection_object:
223                     new_collection_object['manifest_text'] = coll_reader.manifest_text()
224                 coll_reader.normalize()
225             # end with llfuse.lock_released, re-acquire lock
226
227             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
228                 self.new_collection(new_collection_object, coll_reader)
229
230             self.fresh()
231             return True
232         except arvados.errors.NotFoundError:
233             _logger.exception("arv-mount %s: error", self.collection_locator)
234         except arvados.errors.ArgumentError as detail:
235             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
236             if self.collection_object is not None and "manifest_text" in self.collection_object:
237                 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
238         except Exception:
239             _logger.exception("arv-mount %s: error", self.collection_locator)
240             if self.collection_object is not None and "manifest_text" in self.collection_object:
241                 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
242         return False
243
244     def __getitem__(self, item):
245         self.checkupdate()
246         if item == '.arvados#collection':
247             if self.collection_object_file is None:
248                 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
249                 self.inodes.add_entry(self.collection_object_file)
250             return self.collection_object_file
251         else:
252             return super(CollectionDirectory, self).__getitem__(item)
253
254     def __contains__(self, k):
255         if k == '.arvados#collection':
256             return True
257         else:
258             return super(CollectionDirectory, self).__contains__(k)
259
260
261 class MagicDirectory(Directory):
262     """A special directory that logically contains the set of all extant keep locators.
263
264     When a file is referenced by lookup(), it is tested to see if it is a valid
265     keep locator to a manifest, and if so, loads the manifest contents as a
266     subdirectory of this directory with the locator as the directory name.
267     Since querying a list of all extant keep locators is impractical, only
268     collections that have already been accessed are visible to readdir().
269
270     """
271
272     README_TEXT = """
273 This directory provides access to Arvados collections as subdirectories listed
274 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
275 the form '1234567890abcdefghijklmnopqrstuv+123').
276
277 Note that this directory will appear empty until you attempt to access a
278 specific collection subdirectory (such as trying to 'cd' into it), at which
279 point the collection will actually be looked up on the server and the directory
280 will appear if it exists.
281 """.lstrip()
282
283     def __init__(self, parent_inode, inodes, api, num_retries):
284         super(MagicDirectory, self).__init__(parent_inode)
285         self.inodes = inodes
286         self.api = api
287         self.num_retries = num_retries
288
289     def __setattr__(self, name, value):
290         super(MagicDirectory, self).__setattr__(name, value)
291         # When we're assigned an inode, add a README.
292         if ((name == 'inode') and (self.inode is not None) and
293               (not self._entries)):
294             self._entries['README'] = self.inodes.add_entry(
295                 StringFile(self.inode, self.README_TEXT, time.time()))
296             # If we're the root directory, add an identical by_id subdirectory.
297             if self.inode == llfuse.ROOT_INODE:
298                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
299                         self.inode, self.inodes, self.api, self.num_retries))
300
301     def __contains__(self, k):
302         if k in self._entries:
303             return True
304
305         if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
306             return False
307
308         try:
309             e = self.inodes.add_entry(CollectionDirectory(
310                     self.inode, self.inodes, self.api, self.num_retries, k))
311             if e.update():
312                 self._entries[k] = e
313                 return True
314             else:
315                 return False
316         except Exception as e:
317             _logger.debug('arv-mount exception keep %s', e)
318             return False
319
320     def __getitem__(self, item):
321         if item in self:
322             return self._entries[item]
323         else:
324             raise KeyError("No collection with id " + item)
325
326
327 class RecursiveInvalidateDirectory(Directory):
328     def invalidate(self):
329         if self.inode == llfuse.ROOT_INODE:
330             llfuse.lock.acquire()
331         try:
332             super(RecursiveInvalidateDirectory, self).invalidate()
333             for a in self._entries:
334                 self._entries[a].invalidate()
335         except Exception:
336             _logger.exception()
337         finally:
338             if self.inode == llfuse.ROOT_INODE:
339                 llfuse.lock.release()
340
341
342 class TagsDirectory(RecursiveInvalidateDirectory):
343     """A special directory that contains as subdirectories all tags visible to the user."""
344
345     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
346         super(TagsDirectory, self).__init__(parent_inode)
347         self.inodes = inodes
348         self.api = api
349         self.num_retries = num_retries
350         self._poll = True
351         self._poll_time = poll_time
352
353     def update(self):
354         with llfuse.lock_released:
355             tags = self.api.links().list(
356                 filters=[['link_class', '=', 'tag']],
357                 select=['name'], distinct=True
358                 ).execute(num_retries=self.num_retries)
359         if "items" in tags:
360             self.merge(tags['items'],
361                        lambda i: i['name'],
362                        lambda a, i: a.tag == i['name'],
363                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
364
365
366 class TagDirectory(Directory):
367     """A special directory that contains as subdirectories all collections visible
368     to the user that are tagged with a particular tag.
369     """
370
371     def __init__(self, parent_inode, inodes, api, num_retries, tag,
372                  poll=False, poll_time=60):
373         super(TagDirectory, self).__init__(parent_inode)
374         self.inodes = inodes
375         self.api = api
376         self.num_retries = num_retries
377         self.tag = tag
378         self._poll = poll
379         self._poll_time = poll_time
380
381     def update(self):
382         with llfuse.lock_released:
383             taggedcollections = self.api.links().list(
384                 filters=[['link_class', '=', 'tag'],
385                          ['name', '=', self.tag],
386                          ['head_uuid', 'is_a', 'arvados#collection']],
387                 select=['head_uuid']
388                 ).execute(num_retries=self.num_retries)
389         self.merge(taggedcollections['items'],
390                    lambda i: i['head_uuid'],
391                    lambda a, i: a.collection_locator == i['head_uuid'],
392                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
393
394
395 class ProjectDirectory(Directory):
396     """A special directory that contains the contents of a project."""
397
398     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
399                  poll=False, poll_time=60):
400         super(ProjectDirectory, self).__init__(parent_inode)
401         self.inodes = inodes
402         self.api = api
403         self.num_retries = num_retries
404         self.project_object = project_object
405         self.project_object_file = None
406         self.uuid = project_object['uuid']
407         self._poll = poll
408         self._poll_time = poll_time
409
410     def createDirectory(self, i):
411         if collection_uuid_pattern.match(i['uuid']):
412             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
413         elif group_uuid_pattern.match(i['uuid']):
414             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
415         elif link_uuid_pattern.match(i['uuid']):
416             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
417                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
418             else:
419                 return None
420         elif uuid_pattern.match(i['uuid']):
421             return ObjectFile(self.parent_inode, i)
422         else:
423             return None
424
425     def update(self):
426         if self.project_object_file == None:
427             self.project_object_file = ObjectFile(self.inode, self.project_object)
428             self.inodes.add_entry(self.project_object_file)
429
430         def namefn(i):
431             if 'name' in i:
432                 if i['name'] is None or len(i['name']) == 0:
433                     return None
434                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
435                     # collection or subproject
436                     return i['name']
437                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
438                     # name link
439                     return i['name']
440                 elif 'kind' in i and i['kind'].startswith('arvados#'):
441                     # something else
442                     return "{}.{}".format(i['name'], i['kind'][8:])
443             else:
444                 return None
445
446         def samefn(a, i):
447             if isinstance(a, CollectionDirectory):
448                 return a.collection_locator == i['uuid']
449             elif isinstance(a, ProjectDirectory):
450                 return a.uuid == i['uuid']
451             elif isinstance(a, ObjectFile):
452                 return a.uuid == i['uuid'] and not a.stale()
453             return False
454
455         with llfuse.lock_released:
456             if group_uuid_pattern.match(self.uuid):
457                 self.project_object = self.api.groups().get(
458                     uuid=self.uuid).execute(num_retries=self.num_retries)
459             elif user_uuid_pattern.match(self.uuid):
460                 self.project_object = self.api.users().get(
461                     uuid=self.uuid).execute(num_retries=self.num_retries)
462
463             contents = arvados.util.list_all(self.api.groups().contents,
464                                              self.num_retries, uuid=self.uuid)
465             # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
466             contents += arvados.util.list_all(
467                 self.api.links().list, self.num_retries,
468                 filters=[['tail_uuid', '=', self.uuid],
469                          ['link_class', '=', 'name']])
470
471         # end with llfuse.lock_released, re-acquire lock
472
473         self.merge(contents,
474                    namefn,
475                    samefn,
476                    self.createDirectory)
477
478     def __getitem__(self, item):
479         self.checkupdate()
480         if item == '.arvados#project':
481             return self.project_object_file
482         else:
483             return super(ProjectDirectory, self).__getitem__(item)
484
485     def __contains__(self, k):
486         if k == '.arvados#project':
487             return True
488         else:
489             return super(ProjectDirectory, self).__contains__(k)
490
491
492 class SharedDirectory(Directory):
493     """A special directory that represents users or groups who have shared projects with me."""
494
495     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
496                  poll=False, poll_time=60):
497         super(SharedDirectory, self).__init__(parent_inode)
498         self.inodes = inodes
499         self.api = api
500         self.num_retries = num_retries
501         self.current_user = api.users().current().execute(num_retries=num_retries)
502         self._poll = True
503         self._poll_time = poll_time
504
505     def update(self):
506         with llfuse.lock_released:
507             all_projects = arvados.util.list_all(
508                 self.api.groups().list, self.num_retries,
509                 filters=[['group_class','=','project']])
510             objects = {}
511             for ob in all_projects:
512                 objects[ob['uuid']] = ob
513
514             roots = []
515             root_owners = {}
516             for ob in all_projects:
517                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
518                     roots.append(ob)
519                     root_owners[ob['owner_uuid']] = True
520
521             lusers = arvados.util.list_all(
522                 self.api.users().list, self.num_retries,
523                 filters=[['uuid','in', list(root_owners)]])
524             lgroups = arvados.util.list_all(
525                 self.api.groups().list, self.num_retries,
526                 filters=[['uuid','in', list(root_owners)]])
527
528             users = {}
529             groups = {}
530
531             for l in lusers:
532                 objects[l["uuid"]] = l
533             for l in lgroups:
534                 objects[l["uuid"]] = l
535
536             contents = {}
537             for r in root_owners:
538                 if r in objects:
539                     obr = objects[r]
540                     if "name" in obr:
541                         contents[obr["name"]] = obr
542                     if "first_name" in obr:
543                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
544
545             for r in roots:
546                 if r['owner_uuid'] not in objects:
547                     contents[r['name']] = r
548
549         # end with llfuse.lock_released, re-acquire lock
550
551         try:
552             self.merge(contents.items(),
553                        lambda i: i[0],
554                        lambda a, i: a.uuid == i[1]['uuid'],
555                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
556         except Exception:
557             _logger.exception()