a5d79c948694791d9feb1caf288e20fe80696315
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future.utils import viewitems
8 from future.utils import itervalues
9 from builtins import dict
10 import apiclient
11 import arvados
12 import errno
13 import functools
14 import llfuse
15 import logging
16 import re
17 import sys
18 import threading
19 import time
20 from apiclient import errors as apiclient_errors
21
22 from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
23 from .fresh import FreshBase, convertTime, use_counter, check_update
24
25 import arvados.collection
26 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
27
28 _logger = logging.getLogger('arvados.arvados_fuse')
29
30
31 # Match any character which FUSE or Linux cannot accommodate as part
32 # of a filename. (If present in a collection filename, they will
33 # appear as underscores in the fuse mount.)
34 _disallowed_filename_characters = re.compile('[\x00/]')
35
36
37 class Directory(FreshBase):
38     """Generic directory object, backed by a dict.
39
40     Consists of a set of entries with the key representing the filename
41     and the value referencing a File or Directory object.
42     """
43
44     def __init__(self, parent_inode, inodes, apiconfig):
45         """parent_inode is the integer inode number"""
46
47         super(Directory, self).__init__()
48
49         self.inode = None
50         if not isinstance(parent_inode, int):
51             raise Exception("parent_inode should be an int")
52         self.parent_inode = parent_inode
53         self.inodes = inodes
54         self.apiconfig = apiconfig
55         self._entries = {}
56         self._mtime = time.time()
57
58     def forward_slash_subst(self):
59         if not hasattr(self, '_fsns'):
60             self._fsns = None
61             config = self.apiconfig()
62             try:
63                 self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
64             except KeyError:
65                 # old API server with no FSNS config
66                 self._fsns = '_'
67             else:
68                 if self._fsns == '' or self._fsns == '/':
69                     self._fsns = None
70         return self._fsns
71
72     def unsanitize_filename(self, incoming):
73         """Replace ForwardSlashNameSubstitution value with /"""
74         fsns = self.forward_slash_subst()
75         if isinstance(fsns, str):
76             return incoming.replace(fsns, '/')
77         else:
78             return incoming
79
80     def sanitize_filename(self, dirty):
81         """Replace disallowed filename characters according to
82         ForwardSlashNameSubstitution in self.api_config."""
83         # '.' and '..' are not reachable if API server is newer than #6277
84         if dirty is None:
85             return None
86         elif dirty == '':
87             return '_'
88         elif dirty == '.':
89             return '_'
90         elif dirty == '..':
91             return '__'
92         else:
93             fsns = self.forward_slash_subst()
94             if isinstance(fsns, str):
95                 dirty = dirty.replace('/', fsns)
96             return _disallowed_filename_characters.sub('_', dirty)
97
98
99     #  Overridden by subclasses to implement logic to update the
100     #  entries dict when the directory is stale
101     @use_counter
102     def update(self):
103         pass
104
105     # Only used when computing the size of the disk footprint of the directory
106     # (stub)
107     def size(self):
108         return 0
109
110     def persisted(self):
111         return False
112
113     def checkupdate(self):
114         if self.stale():
115             try:
116                 self.update()
117             except apiclient.errors.HttpError as e:
118                 _logger.warn(e)
119
120     @use_counter
121     @check_update
122     def __getitem__(self, item):
123         return self._entries[item]
124
125     @use_counter
126     @check_update
127     def items(self):
128         return list(self._entries.items())
129
130     @use_counter
131     @check_update
132     def __contains__(self, k):
133         return k in self._entries
134
135     @use_counter
136     @check_update
137     def __len__(self):
138         return len(self._entries)
139
140     def fresh(self):
141         self.inodes.touch(self)
142         super(Directory, self).fresh()
143
144     def merge(self, items, fn, same, new_entry):
145         """Helper method for updating the contents of the directory.
146
147         Takes a list describing the new contents of the directory, reuse
148         entries that are the same in both the old and new lists, create new
149         entries, and delete old entries missing from the new list.
150
151         :items: iterable with new directory contents
152
153         :fn: function to take an entry in 'items' and return the desired file or
154         directory name, or None if this entry should be skipped
155
156         :same: function to compare an existing entry (a File or Directory
157         object) with an entry in the items list to determine whether to keep
158         the existing entry.
159
160         :new_entry: function to create a new directory entry (File or Directory
161         object) from an entry in the items list.
162
163         """
164
165         oldentries = self._entries
166         self._entries = {}
167         changed = False
168         for i in items:
169             name = self.sanitize_filename(fn(i))
170             if name:
171                 if name in oldentries and same(oldentries[name], i):
172                     # move existing directory entry over
173                     self._entries[name] = oldentries[name]
174                     del oldentries[name]
175                 else:
176                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
177                     # create new directory entry
178                     ent = new_entry(i)
179                     if ent is not None:
180                         self._entries[name] = self.inodes.add_entry(ent)
181                         changed = True
182
183         # delete any other directory entries that were not in found in 'items'
184         for i in oldentries:
185             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
186             self.inodes.invalidate_entry(self, i)
187             self.inodes.del_entry(oldentries[i])
188             changed = True
189
190         if changed:
191             self.inodes.invalidate_inode(self)
192             self._mtime = time.time()
193
194         self.fresh()
195
196     def in_use(self):
197         if super(Directory, self).in_use():
198             return True
199         for v in itervalues(self._entries):
200             if v.in_use():
201                 return True
202         return False
203
204     def has_ref(self, only_children):
205         if super(Directory, self).has_ref(only_children):
206             return True
207         for v in itervalues(self._entries):
208             if v.has_ref(False):
209                 return True
210         return False
211
212     def clear(self):
213         """Delete all entries"""
214         oldentries = self._entries
215         self._entries = {}
216         for n in oldentries:
217             oldentries[n].clear()
218             self.inodes.del_entry(oldentries[n])
219         self.invalidate()
220
221     def kernel_invalidate(self):
222         # Invalidating the dentry on the parent implies invalidating all paths
223         # below it as well.
224         parent = self.inodes[self.parent_inode]
225
226         # Find self on the parent in order to invalidate this path.
227         # Calling the public items() method might trigger a refresh,
228         # which we definitely don't want, so read the internal dict directly.
229         for k,v in viewitems(parent._entries):
230             if v is self:
231                 self.inodes.invalidate_entry(parent, k)
232                 break
233
234     def mtime(self):
235         return self._mtime
236
237     def writable(self):
238         return False
239
240     def flush(self):
241         pass
242
243     def want_event_subscribe(self):
244         raise NotImplementedError()
245
246     def create(self, name):
247         raise NotImplementedError()
248
249     def mkdir(self, name):
250         raise NotImplementedError()
251
252     def unlink(self, name):
253         raise NotImplementedError()
254
255     def rmdir(self, name):
256         raise NotImplementedError()
257
258     def rename(self, name_old, name_new, src):
259         raise NotImplementedError()
260
261
262 class CollectionDirectoryBase(Directory):
263     """Represent an Arvados Collection as a directory.
264
265     This class is used for Subcollections, and is also the base class for
266     CollectionDirectory, which implements collection loading/saving on
267     Collection records.
268
269     Most operations act only the underlying Arvados `Collection` object.  The
270     `Collection` object signals via a notify callback to
271     `CollectionDirectoryBase.on_event` that an item was added, removed or
272     modified.  FUSE inodes and directory entries are created, deleted or
273     invalidated in response to these events.
274
275     """
276
277     def __init__(self, parent_inode, inodes, apiconfig, collection):
278         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig)
279         self.apiconfig = apiconfig
280         self.collection = collection
281
282     def new_entry(self, name, item, mtime):
283         name = self.sanitize_filename(name)
284         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
285             if item.fuse_entry.dead is not True:
286                 raise Exception("Can only reparent dead inode entry")
287             if item.fuse_entry.inode is None:
288                 raise Exception("Reparented entry must still have valid inode")
289             item.fuse_entry.dead = False
290             self._entries[name] = item.fuse_entry
291         elif isinstance(item, arvados.collection.RichCollectionBase):
292             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, item))
293             self._entries[name].populate(mtime)
294         else:
295             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
296         item.fuse_entry = self._entries[name]
297
298     def on_event(self, event, collection, name, item):
299         if collection == self.collection:
300             name = self.sanitize_filename(name)
301             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
302             with llfuse.lock:
303                 if event == arvados.collection.ADD:
304                     self.new_entry(name, item, self.mtime())
305                 elif event == arvados.collection.DEL:
306                     ent = self._entries[name]
307                     del self._entries[name]
308                     self.inodes.invalidate_entry(self, name)
309                     self.inodes.del_entry(ent)
310                 elif event == arvados.collection.MOD:
311                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
312                         self.inodes.invalidate_inode(item.fuse_entry)
313                     elif name in self._entries:
314                         self.inodes.invalidate_inode(self._entries[name])
315
316     def populate(self, mtime):
317         self._mtime = mtime
318         self.collection.subscribe(self.on_event)
319         for entry, item in viewitems(self.collection):
320             self.new_entry(entry, item, self.mtime())
321
322     def writable(self):
323         return self.collection.writable()
324
325     @use_counter
326     def flush(self):
327         with llfuse.lock_released:
328             self.collection.root_collection().save()
329
330     @use_counter
331     @check_update
332     def create(self, name):
333         with llfuse.lock_released:
334             self.collection.open(name, "w").close()
335
336     @use_counter
337     @check_update
338     def mkdir(self, name):
339         with llfuse.lock_released:
340             self.collection.mkdirs(name)
341
342     @use_counter
343     @check_update
344     def unlink(self, name):
345         with llfuse.lock_released:
346             self.collection.remove(name)
347         self.flush()
348
349     @use_counter
350     @check_update
351     def rmdir(self, name):
352         with llfuse.lock_released:
353             self.collection.remove(name)
354         self.flush()
355
356     @use_counter
357     @check_update
358     def rename(self, name_old, name_new, src):
359         if not isinstance(src, CollectionDirectoryBase):
360             raise llfuse.FUSEError(errno.EPERM)
361
362         if name_new in self:
363             ent = src[name_old]
364             tgt = self[name_new]
365             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
366                 pass
367             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
368                 if len(tgt) > 0:
369                     raise llfuse.FUSEError(errno.ENOTEMPTY)
370             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
371                 raise llfuse.FUSEError(errno.ENOTDIR)
372             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
373                 raise llfuse.FUSEError(errno.EISDIR)
374
375         with llfuse.lock_released:
376             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
377         self.flush()
378         src.flush()
379
380     def clear(self):
381         super(CollectionDirectoryBase, self).clear()
382         self.collection = None
383
384
385 class CollectionDirectory(CollectionDirectoryBase):
386     """Represents the root of a directory tree representing a collection."""
387
388     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
389         super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, None)
390         self.api = api
391         self.num_retries = num_retries
392         self.collection_record_file = None
393         self.collection_record = None
394         self._poll = True
395         try:
396             self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
397         except:
398             _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0])
399             self._poll_time = 60*60
400
401         if isinstance(collection_record, dict):
402             self.collection_locator = collection_record['uuid']
403             self._mtime = convertTime(collection_record.get('modified_at'))
404         else:
405             self.collection_locator = collection_record
406             self._mtime = 0
407         self._manifest_size = 0
408         if self.collection_locator:
409             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
410         self._updating_lock = threading.Lock()
411
412     def same(self, i):
413         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
414
415     def writable(self):
416         return self.collection.writable() if self.collection is not None else self._writable
417
418     def want_event_subscribe(self):
419         return (uuid_pattern.match(self.collection_locator) is not None)
420
421     # Used by arv-web.py to switch the contents of the CollectionDirectory
422     def change_collection(self, new_locator):
423         """Switch the contents of the CollectionDirectory.
424
425         Must be called with llfuse.lock held.
426         """
427
428         self.collection_locator = new_locator
429         self.collection_record = None
430         self.update()
431
432     def new_collection(self, new_collection_record, coll_reader):
433         if self.inode:
434             self.clear()
435
436         self.collection_record = new_collection_record
437
438         if self.collection_record:
439             self._mtime = convertTime(self.collection_record.get('modified_at'))
440             self.collection_locator = self.collection_record["uuid"]
441             if self.collection_record_file is not None:
442                 self.collection_record_file.update(self.collection_record)
443
444         self.collection = coll_reader
445         self.populate(self.mtime())
446
447     def uuid(self):
448         return self.collection_locator
449
450     @use_counter
451     def update(self, to_record_version=None):
452         try:
453             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
454                 return True
455
456             if self.collection_locator is None:
457                 self.fresh()
458                 return True
459
460             try:
461                 with llfuse.lock_released:
462                     self._updating_lock.acquire()
463                     if not self.stale():
464                         return
465
466                     _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
467                     if self.collection is not None:
468                         if self.collection.known_past_version(to_record_version):
469                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
470                         else:
471                             self.collection.update()
472                     else:
473                         if uuid_pattern.match(self.collection_locator):
474                             coll_reader = arvados.collection.Collection(
475                                 self.collection_locator, self.api, self.api.keep,
476                                 num_retries=self.num_retries)
477                         else:
478                             coll_reader = arvados.collection.CollectionReader(
479                                 self.collection_locator, self.api, self.api.keep,
480                                 num_retries=self.num_retries)
481                         new_collection_record = coll_reader.api_response() or {}
482                         # If the Collection only exists in Keep, there will be no API
483                         # response.  Fill in the fields we need.
484                         if 'uuid' not in new_collection_record:
485                             new_collection_record['uuid'] = self.collection_locator
486                         if "portable_data_hash" not in new_collection_record:
487                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
488                         if 'manifest_text' not in new_collection_record:
489                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
490
491                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
492                             self.new_collection(new_collection_record, coll_reader)
493
494                         self._manifest_size = len(coll_reader.manifest_text())
495                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
496                 # end with llfuse.lock_released, re-acquire lock
497
498                 self.fresh()
499                 return True
500             finally:
501                 self._updating_lock.release()
502         except arvados.errors.NotFoundError as e:
503             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
504         except arvados.errors.ArgumentError as detail:
505             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
506             if self.collection_record is not None and "manifest_text" in self.collection_record:
507                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
508         except Exception:
509             _logger.exception("arv-mount %s: error", self.collection_locator)
510             if self.collection_record is not None and "manifest_text" in self.collection_record:
511                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
512         self.invalidate()
513         return False
514
515     @use_counter
516     @check_update
517     def __getitem__(self, item):
518         if item == '.arvados#collection':
519             if self.collection_record_file is None:
520                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
521                 self.inodes.add_entry(self.collection_record_file)
522             return self.collection_record_file
523         else:
524             return super(CollectionDirectory, self).__getitem__(item)
525
526     def __contains__(self, k):
527         if k == '.arvados#collection':
528             return True
529         else:
530             return super(CollectionDirectory, self).__contains__(k)
531
532     def invalidate(self):
533         self.collection_record = None
534         self.collection_record_file = None
535         super(CollectionDirectory, self).invalidate()
536
537     def persisted(self):
538         return (self.collection_locator is not None)
539
540     def objsize(self):
541         # This is an empirically-derived heuristic to estimate the memory used
542         # to store this collection's metadata.  Calculating the memory
543         # footprint directly would be more accurate, but also more complicated.
544         return self._manifest_size * 128
545
546     def finalize(self):
547         if self.collection is not None:
548             if self.writable():
549                 self.collection.save()
550             self.collection.stop_threads()
551
552     def clear(self):
553         if self.collection is not None:
554             self.collection.stop_threads()
555         super(CollectionDirectory, self).clear()
556         self._manifest_size = 0
557
558
559 class TmpCollectionDirectory(CollectionDirectoryBase):
560     """A directory backed by an Arvados collection that never gets saved.
561
562     This supports using Keep as scratch space. A userspace program can
563     read the .arvados#collection file to get a current manifest in
564     order to save a snapshot of the scratch data or use it as a crunch
565     job output.
566     """
567
568     class UnsaveableCollection(arvados.collection.Collection):
569         def save(self):
570             pass
571         def save_new(self):
572             pass
573
574     def __init__(self, parent_inode, inodes, api_client, num_retries, storage_classes=None):
575         collection = self.UnsaveableCollection(
576             api_client=api_client,
577             keep_client=api_client.keep,
578             num_retries=num_retries,
579             storage_classes_desired=storage_classes)
580         super(TmpCollectionDirectory, self).__init__(
581             parent_inode, inodes, api_client.config, collection)
582         self.collection_record_file = None
583         self.populate(self.mtime())
584
585     def on_event(self, *args, **kwargs):
586         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
587         if self.collection_record_file:
588             with llfuse.lock:
589                 self.collection_record_file.invalidate()
590             self.inodes.invalidate_inode(self.collection_record_file)
591             _logger.debug("%s invalidated collection record", self)
592
593     def collection_record(self):
594         with llfuse.lock_released:
595             return {
596                 "uuid": None,
597                 "manifest_text": self.collection.manifest_text(),
598                 "portable_data_hash": self.collection.portable_data_hash(),
599             }
600
601     def __contains__(self, k):
602         return (k == '.arvados#collection' or
603                 super(TmpCollectionDirectory, self).__contains__(k))
604
605     @use_counter
606     def __getitem__(self, item):
607         if item == '.arvados#collection':
608             if self.collection_record_file is None:
609                 self.collection_record_file = FuncToJSONFile(
610                     self.inode, self.collection_record)
611                 self.inodes.add_entry(self.collection_record_file)
612             return self.collection_record_file
613         return super(TmpCollectionDirectory, self).__getitem__(item)
614
615     def persisted(self):
616         return False
617
618     def writable(self):
619         return True
620
621     def want_event_subscribe(self):
622         return False
623
624     def finalize(self):
625         self.collection.stop_threads()
626
627     def invalidate(self):
628         if self.collection_record_file:
629             self.collection_record_file.invalidate()
630         super(TmpCollectionDirectory, self).invalidate()
631
632
633 class MagicDirectory(Directory):
634     """A special directory that logically contains the set of all extant keep locators.
635
636     When a file is referenced by lookup(), it is tested to see if it is a valid
637     keep locator to a manifest, and if so, loads the manifest contents as a
638     subdirectory of this directory with the locator as the directory name.
639     Since querying a list of all extant keep locators is impractical, only
640     collections that have already been accessed are visible to readdir().
641
642     """
643
644     README_TEXT = """
645 This directory provides access to Arvados collections as subdirectories listed
646 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
647 the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
648 (in the form 'zzzzz-j7d0g-1234567890abcde').
649
650 Note that this directory will appear empty until you attempt to access a
651 specific collection or project subdirectory (such as trying to 'cd' into it),
652 at which point the collection or project will actually be looked up on the server
653 and the directory will appear if it exists.
654
655 """.lstrip()
656
657     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False, storage_classes=None):
658         super(MagicDirectory, self).__init__(parent_inode, inodes, api.config)
659         self.api = api
660         self.num_retries = num_retries
661         self.pdh_only = pdh_only
662         self.storage_classes = storage_classes
663
664     def __setattr__(self, name, value):
665         super(MagicDirectory, self).__setattr__(name, value)
666         # When we're assigned an inode, add a README.
667         if ((name == 'inode') and (self.inode is not None) and
668               (not self._entries)):
669             self._entries['README'] = self.inodes.add_entry(
670                 StringFile(self.inode, self.README_TEXT, time.time()))
671             # If we're the root directory, add an identical by_id subdirectory.
672             if self.inode == llfuse.ROOT_INODE:
673                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
674                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
675
676     def __contains__(self, k):
677         if k in self._entries:
678             return True
679
680         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
681             return False
682
683         try:
684             e = None
685
686             if group_uuid_pattern.match(k):
687                 project = self.api.groups().list(
688                     filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
689                 if project[u'items_available'] == 0:
690                     return False
691                 e = self.inodes.add_entry(ProjectDirectory(
692                     self.inode, self.inodes, self.api, self.num_retries,
693                     project[u'items'][0], storage_classes=self.storage_classes))
694             else:
695                 e = self.inodes.add_entry(CollectionDirectory(
696                         self.inode, self.inodes, self.api, self.num_retries, k))
697
698             if e.update():
699                 if k not in self._entries:
700                     self._entries[k] = e
701                 else:
702                     self.inodes.del_entry(e)
703                 return True
704             else:
705                 self.inodes.invalidate_entry(self, k)
706                 self.inodes.del_entry(e)
707                 return False
708         except Exception as ex:
709             _logger.exception("arv-mount lookup '%s':", k)
710             if e is not None:
711                 self.inodes.del_entry(e)
712             return False
713
714     def __getitem__(self, item):
715         if item in self:
716             return self._entries[item]
717         else:
718             raise KeyError("No collection with id " + item)
719
720     def clear(self):
721         pass
722
723     def want_event_subscribe(self):
724         return not self.pdh_only
725
726
727 class TagsDirectory(Directory):
728     """A special directory that contains as subdirectories all tags visible to the user."""
729
730     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
731         super(TagsDirectory, self).__init__(parent_inode, inodes, api.config)
732         self.api = api
733         self.num_retries = num_retries
734         self._poll = True
735         self._poll_time = poll_time
736         self._extra = set()
737
738     def want_event_subscribe(self):
739         return True
740
741     @use_counter
742     def update(self):
743         with llfuse.lock_released:
744             tags = self.api.links().list(
745                 filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
746                 select=['name'], distinct=True, limit=1000
747                 ).execute(num_retries=self.num_retries)
748         if "items" in tags:
749             self.merge(tags['items']+[{"name": n} for n in self._extra],
750                        lambda i: i['name'],
751                        lambda a, i: a.tag == i['name'],
752                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
753
754     @use_counter
755     @check_update
756     def __getitem__(self, item):
757         if super(TagsDirectory, self).__contains__(item):
758             return super(TagsDirectory, self).__getitem__(item)
759         with llfuse.lock_released:
760             tags = self.api.links().list(
761                 filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
762             ).execute(num_retries=self.num_retries)
763         if tags["items"]:
764             self._extra.add(item)
765             self.update()
766         return super(TagsDirectory, self).__getitem__(item)
767
768     @use_counter
769     @check_update
770     def __contains__(self, k):
771         if super(TagsDirectory, self).__contains__(k):
772             return True
773         try:
774             self[k]
775             return True
776         except KeyError:
777             pass
778         return False
779
780
781 class TagDirectory(Directory):
782     """A special directory that contains as subdirectories all collections visible
783     to the user that are tagged with a particular tag.
784     """
785
786     def __init__(self, parent_inode, inodes, api, num_retries, tag,
787                  poll=False, poll_time=60):
788         super(TagDirectory, self).__init__(parent_inode, inodes, api.config)
789         self.api = api
790         self.num_retries = num_retries
791         self.tag = tag
792         self._poll = poll
793         self._poll_time = poll_time
794
795     def want_event_subscribe(self):
796         return True
797
798     @use_counter
799     def update(self):
800         with llfuse.lock_released:
801             taggedcollections = self.api.links().list(
802                 filters=[['link_class', '=', 'tag'],
803                          ['name', '=', self.tag],
804                          ['head_uuid', 'is_a', 'arvados#collection']],
805                 select=['head_uuid']
806                 ).execute(num_retries=self.num_retries)
807         self.merge(taggedcollections['items'],
808                    lambda i: i['head_uuid'],
809                    lambda a, i: a.collection_locator == i['head_uuid'],
810                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
811
812
813 class ProjectDirectory(Directory):
814     """A special directory that contains the contents of a project."""
815
816     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
817                  poll=True, poll_time=3, storage_classes=None):
818         super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config)
819         self.api = api
820         self.num_retries = num_retries
821         self.project_object = project_object
822         self.project_object_file = None
823         self.project_uuid = project_object['uuid']
824         self._poll = poll
825         self._poll_time = poll_time
826         self._updating_lock = threading.Lock()
827         self._current_user = None
828         self._full_listing = False
829         self.storage_classes = storage_classes
830
831     def want_event_subscribe(self):
832         return True
833
834     def createDirectory(self, i):
835         if collection_uuid_pattern.match(i['uuid']):
836             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
837         elif group_uuid_pattern.match(i['uuid']):
838             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time, self.storage_classes)
839         elif link_uuid_pattern.match(i['uuid']):
840             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
841                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
842             else:
843                 return None
844         elif uuid_pattern.match(i['uuid']):
845             return ObjectFile(self.parent_inode, i)
846         else:
847             return None
848
849     def uuid(self):
850         return self.project_uuid
851
852     def items(self):
853         self._full_listing = True
854         return super(ProjectDirectory, self).items()
855
856     def namefn(self, i):
857         if 'name' in i:
858             if i['name'] is None or len(i['name']) == 0:
859                 return None
860             elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])):
861                 # collection or subproject
862                 return i['name']
863             elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
864                 # name link
865                 return i['name']
866             elif 'kind' in i and i['kind'].startswith('arvados#'):
867                 # something else
868                 return "{}.{}".format(i['name'], i['kind'][8:])
869         else:
870             return None
871
872
873     @use_counter
874     def update(self):
875         if self.project_object_file == None:
876             self.project_object_file = ObjectFile(self.inode, self.project_object)
877             self.inodes.add_entry(self.project_object_file)
878
879         if not self._full_listing:
880             return True
881
882         def samefn(a, i):
883             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
884                 return a.uuid() == i['uuid']
885             elif isinstance(a, ObjectFile):
886                 return a.uuid() == i['uuid'] and not a.stale()
887             return False
888
889         try:
890             with llfuse.lock_released:
891                 self._updating_lock.acquire()
892                 if not self.stale():
893                     return
894
895                 if group_uuid_pattern.match(self.project_uuid):
896                     self.project_object = self.api.groups().get(
897                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
898                 elif user_uuid_pattern.match(self.project_uuid):
899                     self.project_object = self.api.users().get(
900                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
901                 # do this in 2 steps until #17424 is fixed
902                 contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
903                                                         order_key="uuid",
904                                                         num_retries=self.num_retries,
905                                                         uuid=self.project_uuid,
906                                                         filters=[["uuid", "is_a", "arvados#group"],
907                                                                  ["groups.group_class", "in", ["project","filter"]]]))
908                 contents.extend(arvados.util.keyset_list_all(self.api.groups().contents,
909                                                              order_key="uuid",
910                                                              num_retries=self.num_retries,
911                                                              uuid=self.project_uuid,
912                                                              filters=[["uuid", "is_a", "arvados#collection"]]))
913
914             # end with llfuse.lock_released, re-acquire lock
915
916             self.merge(contents,
917                        self.namefn,
918                        samefn,
919                        self.createDirectory)
920             return True
921         finally:
922             self._updating_lock.release()
923
924     def _add_entry(self, i, name):
925         ent = self.createDirectory(i)
926         self._entries[name] = self.inodes.add_entry(ent)
927         return self._entries[name]
928
929     @use_counter
930     @check_update
931     def __getitem__(self, k):
932         if k == '.arvados#project':
933             return self.project_object_file
934         elif self._full_listing or super(ProjectDirectory, self).__contains__(k):
935             return super(ProjectDirectory, self).__getitem__(k)
936         with llfuse.lock_released:
937             k2 = self.unsanitize_filename(k)
938             if k2 == k:
939                 namefilter = ["name", "=", k]
940             else:
941                 namefilter = ["name", "in", [k, k2]]
942             contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
943                                                        ["group_class", "in", ["project","filter"]],
944                                                        namefilter],
945                                               limit=2).execute(num_retries=self.num_retries)["items"]
946             if not contents:
947                 contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
948                                                                 namefilter],
949                                                        limit=2).execute(num_retries=self.num_retries)["items"]
950         if contents:
951             if len(contents) > 1 and contents[1]['name'] == k:
952                 # If "foo/bar" and "foo[SUBST]bar" both exist, use
953                 # "foo[SUBST]bar".
954                 contents = [contents[1]]
955             name = self.sanitize_filename(self.namefn(contents[0]))
956             if name != k:
957                 raise KeyError(k)
958             return self._add_entry(contents[0], name)
959
960         # Didn't find item
961         raise KeyError(k)
962
963     def __contains__(self, k):
964         if k == '.arvados#project':
965             return True
966         try:
967             self[k]
968             return True
969         except KeyError:
970             pass
971         return False
972
973     @use_counter
974     @check_update
975     def writable(self):
976         with llfuse.lock_released:
977             if not self._current_user:
978                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
979             return self._current_user["uuid"] in self.project_object.get("writable_by", [])
980
981     def persisted(self):
982         return True
983
984     @use_counter
985     @check_update
986     def mkdir(self, name):
987         try:
988             with llfuse.lock_released:
989                 c = {
990                     "owner_uuid": self.project_uuid,
991                     "name": name,
992                     "manifest_text": "" }
993                 if self.storage_classes is not None:
994                     c["storage_classes_desired"] = self.storage_classes
995                 try:
996                     self.api.collections().create(body=c).execute(num_retries=self.num_retries)
997                 except Exception as e:
998                     raise
999             self.invalidate()
1000         except apiclient_errors.Error as error:
1001             _logger.error(error)
1002             raise llfuse.FUSEError(errno.EEXIST)
1003
1004     @use_counter
1005     @check_update
1006     def rmdir(self, name):
1007         if name not in self:
1008             raise llfuse.FUSEError(errno.ENOENT)
1009         if not isinstance(self[name], CollectionDirectory):
1010             raise llfuse.FUSEError(errno.EPERM)
1011         if len(self[name]) > 0:
1012             raise llfuse.FUSEError(errno.ENOTEMPTY)
1013         with llfuse.lock_released:
1014             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
1015         self.invalidate()
1016
1017     @use_counter
1018     @check_update
1019     def rename(self, name_old, name_new, src):
1020         if not isinstance(src, ProjectDirectory):
1021             raise llfuse.FUSEError(errno.EPERM)
1022
1023         ent = src[name_old]
1024
1025         if not isinstance(ent, CollectionDirectory):
1026             raise llfuse.FUSEError(errno.EPERM)
1027
1028         if name_new in self:
1029             # POSIX semantics for replacing one directory with another is
1030             # tricky (the target directory must be empty, the operation must be
1031             # atomic which isn't possible with the Arvados API as of this
1032             # writing) so don't support that.
1033             raise llfuse.FUSEError(errno.EPERM)
1034
1035         self.api.collections().update(uuid=ent.uuid(),
1036                                       body={"owner_uuid": self.uuid(),
1037                                             "name": name_new}).execute(num_retries=self.num_retries)
1038
1039         # Acually move the entry from source directory to this directory.
1040         del src._entries[name_old]
1041         self._entries[name_new] = ent
1042         self.inodes.invalidate_entry(src, name_old)
1043
1044     @use_counter
1045     def child_event(self, ev):
1046         properties = ev.get("properties") or {}
1047         old_attrs = properties.get("old_attributes") or {}
1048         new_attrs = properties.get("new_attributes") or {}
1049         old_attrs["uuid"] = ev["object_uuid"]
1050         new_attrs["uuid"] = ev["object_uuid"]
1051         old_name = self.sanitize_filename(self.namefn(old_attrs))
1052         new_name = self.sanitize_filename(self.namefn(new_attrs))
1053
1054         # create events will have a new name, but not an old name
1055         # delete events will have an old name, but not a new name
1056         # update events will have an old and new name, and they may be same or different
1057         # if they are the same, an unrelated field changed and there is nothing to do.
1058
1059         if old_attrs.get("owner_uuid") != self.project_uuid:
1060             # Was moved from somewhere else, so don't try to remove entry.
1061             old_name = None
1062         if ev.get("object_owner_uuid") != self.project_uuid:
1063             # Was moved to somewhere else, so don't try to add entry
1064             new_name = None
1065
1066         if old_attrs.get("is_trashed"):
1067             # Was previously deleted
1068             old_name = None
1069         if new_attrs.get("is_trashed"):
1070             # Has been deleted
1071             new_name = None
1072
1073         if new_name != old_name:
1074             ent = None
1075             if old_name in self._entries:
1076                 ent = self._entries[old_name]
1077                 del self._entries[old_name]
1078                 self.inodes.invalidate_entry(self, old_name)
1079
1080             if new_name:
1081                 if ent is not None:
1082                     self._entries[new_name] = ent
1083                 else:
1084                     self._add_entry(new_attrs, new_name)
1085             elif ent is not None:
1086                 self.inodes.del_entry(ent)
1087
1088
1089 class SharedDirectory(Directory):
1090     """A special directory that represents users or groups who have shared projects with me."""
1091
1092     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
1093                  poll=False, poll_time=60, storage_classes=None):
1094         super(SharedDirectory, self).__init__(parent_inode, inodes, api.config)
1095         self.api = api
1096         self.num_retries = num_retries
1097         self.current_user = api.users().current().execute(num_retries=num_retries)
1098         self._poll = True
1099         self._poll_time = poll_time
1100         self._updating_lock = threading.Lock()
1101         self.storage_classes = storage_classes
1102
1103     @use_counter
1104     def update(self):
1105         try:
1106             with llfuse.lock_released:
1107                 self._updating_lock.acquire()
1108                 if not self.stale():
1109                     return
1110
1111                 contents = {}
1112                 roots = []
1113                 root_owners = set()
1114                 objects = {}
1115
1116                 methods = self.api._rootDesc.get('resources')["groups"]['methods']
1117                 if 'httpMethod' in methods.get('shared', {}):
1118                     page = []
1119                     while True:
1120                         resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
1121                                                         order="uuid",
1122                                                         limit=10000,
1123                                                         count="none",
1124                                                         include="owner_uuid").execute()
1125                         if not resp["items"]:
1126                             break
1127                         page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
1128                         for r in resp["items"]:
1129                             objects[r["uuid"]] = r
1130                             roots.append(r["uuid"])
1131                         for r in resp["included"]:
1132                             objects[r["uuid"]] = r
1133                             root_owners.add(r["uuid"])
1134                 else:
1135                     all_projects = list(arvados.util.keyset_list_all(
1136                         self.api.groups().list,
1137                         order_key="uuid",
1138                         num_retries=self.num_retries,
1139                         filters=[['group_class','in',['project','filter']]],
1140                         select=["uuid", "owner_uuid"]))
1141                     for ob in all_projects:
1142                         objects[ob['uuid']] = ob
1143
1144                     current_uuid = self.current_user['uuid']
1145                     for ob in all_projects:
1146                         if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
1147                             roots.append(ob['uuid'])
1148                             root_owners.add(ob['owner_uuid'])
1149
1150                     lusers = arvados.util.keyset_list_all(
1151                         self.api.users().list,
1152                         order_key="uuid",
1153                         num_retries=self.num_retries,
1154                         filters=[['uuid','in', list(root_owners)]])
1155                     lgroups = arvados.util.keyset_list_all(
1156                         self.api.groups().list,
1157                         order_key="uuid",
1158                         num_retries=self.num_retries,
1159                         filters=[['uuid','in', list(root_owners)+roots]])
1160
1161                     for l in lusers:
1162                         objects[l["uuid"]] = l
1163                     for l in lgroups:
1164                         objects[l["uuid"]] = l
1165
1166                 for r in root_owners:
1167                     if r in objects:
1168                         obr = objects[r]
1169                         if obr.get("name"):
1170                             contents[obr["name"]] = obr
1171                         elif "first_name" in obr:
1172                             contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
1173
1174                 for r in roots:
1175                     if r in objects:
1176                         obr = objects[r]
1177                         if obr['owner_uuid'] not in objects:
1178                             contents[obr["name"]] = obr
1179
1180             # end with llfuse.lock_released, re-acquire lock
1181
1182             self.merge(viewitems(contents),
1183                        lambda i: i[0],
1184                        lambda a, i: a.uuid() == i[1]['uuid'],
1185                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
1186         except Exception:
1187             _logger.exception("arv-mount shared dir error")
1188         finally:
1189             self._updating_lock.release()
1190
1191     def want_event_subscribe(self):
1192         return True