6833: add poll_time to CollectionDirectory.
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
1 import logging
2 import re
3 import time
4 import llfuse
5 import arvados
6 import apiclient
7 import functools
8 import threading
9 from apiclient import errors as apiclient_errors
10 import errno
11 import time
12
13 from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
14 from fresh import FreshBase, convertTime, use_counter, check_update
15
16 import arvados.collection
17 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
18
19 _logger = logging.getLogger('arvados.arvados_fuse')
20
21
22 # Match any character which FUSE or Linux cannot accommodate as part
23 # of a filename. (If present in a collection filename, they will
24 # appear as underscores in the fuse mount.)
25 _disallowed_filename_characters = re.compile('[\x00/]')
26
27 # '.' and '..' are not reachable if API server is newer than #6277
28 def sanitize_filename(dirty):
29     """Replace disallowed filename characters with harmless "_"."""
30     if dirty is None:
31         return None
32     elif dirty == '':
33         return '_'
34     elif dirty == '.':
35         return '_'
36     elif dirty == '..':
37         return '__'
38     else:
39         return _disallowed_filename_characters.sub('_', dirty)
40
41
42 class Directory(FreshBase):
43     """Generic directory object, backed by a dict.
44
45     Consists of a set of entries with the key representing the filename
46     and the value referencing a File or Directory object.
47     """
48
49     def __init__(self, parent_inode, inodes):
50         """parent_inode is the integer inode number"""
51
52         super(Directory, self).__init__()
53
54         self.inode = None
55         if not isinstance(parent_inode, int):
56             raise Exception("parent_inode should be an int")
57         self.parent_inode = parent_inode
58         self.inodes = inodes
59         self._entries = {}
60         self._mtime = time.time()
61
62     #  Overriden by subclasses to implement logic to update the entries dict
63     #  when the directory is stale
64     @use_counter
65     def update(self):
66         pass
67
68     # Only used when computing the size of the disk footprint of the directory
69     # (stub)
70     def size(self):
71         return 0
72
73     def persisted(self):
74         return False
75
76     def checkupdate(self):
77         if self.stale():
78             try:
79                 self.update()
80             except apiclient.errors.HttpError as e:
81                 _logger.warn(e)
82
83     @use_counter
84     @check_update
85     def __getitem__(self, item):
86         return self._entries[item]
87
88     @use_counter
89     @check_update
90     def items(self):
91         return list(self._entries.items())
92
93     @use_counter
94     @check_update
95     def __contains__(self, k):
96         return k in self._entries
97
98     @use_counter
99     @check_update
100     def __len__(self):
101         return len(self._entries)
102
103     def fresh(self):
104         self.inodes.touch(self)
105         super(Directory, self).fresh()
106
107     def merge(self, items, fn, same, new_entry):
108         """Helper method for updating the contents of the directory.
109
110         Takes a list describing the new contents of the directory, reuse
111         entries that are the same in both the old and new lists, create new
112         entries, and delete old entries missing from the new list.
113
114         :items: iterable with new directory contents
115
116         :fn: function to take an entry in 'items' and return the desired file or
117         directory name, or None if this entry should be skipped
118
119         :same: function to compare an existing entry (a File or Directory
120         object) with an entry in the items list to determine whether to keep
121         the existing entry.
122
123         :new_entry: function to create a new directory entry (File or Directory
124         object) from an entry in the items list.
125
126         """
127
128         oldentries = self._entries
129         self._entries = {}
130         changed = False
131         for i in items:
132             name = sanitize_filename(fn(i))
133             if name:
134                 if name in oldentries and same(oldentries[name], i):
135                     # move existing directory entry over
136                     self._entries[name] = oldentries[name]
137                     del oldentries[name]
138                 else:
139                     _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
140                     # create new directory entry
141                     ent = new_entry(i)
142                     if ent is not None:
143                         self._entries[name] = self.inodes.add_entry(ent)
144                         changed = True
145
146         # delete any other directory entries that were not in found in 'items'
147         for i in oldentries:
148             _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
149             self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
150             self.inodes.del_entry(oldentries[i])
151             changed = True
152
153         if changed:
154             self.inodes.invalidate_inode(self.inode)
155             self._mtime = time.time()
156
157         self.fresh()
158
159     def clear(self, force=False):
160         """Delete all entries"""
161
162         if not self.in_use() or force:
163             oldentries = self._entries
164             self._entries = {}
165             for n in oldentries:
166                 if not oldentries[n].clear(force):
167                     self._entries = oldentries
168                     return False
169             for n in oldentries:
170                 self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
171                 self.inodes.del_entry(oldentries[n])
172             self.inodes.invalidate_inode(self.inode)
173             self.invalidate()
174             return True
175         else:
176             return False
177
178     def mtime(self):
179         return self._mtime
180
181     def writable(self):
182         return False
183
184     def flush(self):
185         pass
186
187     def create(self, name):
188         raise NotImplementedError()
189
190     def mkdir(self, name):
191         raise NotImplementedError()
192
193     def unlink(self, name):
194         raise NotImplementedError()
195
196     def rmdir(self, name):
197         raise NotImplementedError()
198
199     def rename(self, name_old, name_new, src):
200         raise NotImplementedError()
201
202
203 class CollectionDirectoryBase(Directory):
204     """Represent an Arvados Collection as a directory.
205
206     This class is used for Subcollections, and is also the base class for
207     CollectionDirectory, which implements collection loading/saving on
208     Collection records.
209
210     Most operations act only the underlying Arvados `Collection` object.  The
211     `Collection` object signals via a notify callback to
212     `CollectionDirectoryBase.on_event` that an item was added, removed or
213     modified.  FUSE inodes and directory entries are created, deleted or
214     invalidated in response to these events.
215
216     """
217
218     def __init__(self, parent_inode, inodes, collection):
219         super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
220         self.collection = collection
221
222     def new_entry(self, name, item, mtime):
223         name = sanitize_filename(name)
224         if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
225             if item.fuse_entry.dead is not True:
226                 raise Exception("Can only reparent dead inode entry")
227             if item.fuse_entry.inode is None:
228                 raise Exception("Reparented entry must still have valid inode")
229             item.fuse_entry.dead = False
230             self._entries[name] = item.fuse_entry
231         elif isinstance(item, arvados.collection.RichCollectionBase):
232             self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
233             self._entries[name].populate(mtime)
234         else:
235             self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
236         item.fuse_entry = self._entries[name]
237
238     def on_event(self, event, collection, name, item):
239         if collection == self.collection:
240             name = sanitize_filename(name)
241             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
242             with llfuse.lock:
243                 if event == arvados.collection.ADD:
244                     self.new_entry(name, item, self.mtime())
245                 elif event == arvados.collection.DEL:
246                     ent = self._entries[name]
247                     del self._entries[name]
248                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
249                     self.inodes.del_entry(ent)
250                 elif event == arvados.collection.MOD:
251                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
252                         self.inodes.invalidate_inode(item.fuse_entry.inode)
253                     elif name in self._entries:
254                         self.inodes.invalidate_inode(self._entries[name].inode)
255
256     def populate(self, mtime):
257         self._mtime = mtime
258         self.collection.subscribe(self.on_event)
259         for entry, item in self.collection.items():
260             self.new_entry(entry, item, self.mtime())
261
262     def writable(self):
263         return self.collection.writable()
264
265     @use_counter
266     def flush(self):
267         with llfuse.lock_released:
268             self.collection.root_collection().save()
269
270     @use_counter
271     @check_update
272     def create(self, name):
273         with llfuse.lock_released:
274             self.collection.open(name, "w").close()
275
276     @use_counter
277     @check_update
278     def mkdir(self, name):
279         with llfuse.lock_released:
280             self.collection.mkdirs(name)
281
282     @use_counter
283     @check_update
284     def unlink(self, name):
285         with llfuse.lock_released:
286             self.collection.remove(name)
287         self.flush()
288
289     @use_counter
290     @check_update
291     def rmdir(self, name):
292         with llfuse.lock_released:
293             self.collection.remove(name)
294         self.flush()
295
296     @use_counter
297     @check_update
298     def rename(self, name_old, name_new, src):
299         if not isinstance(src, CollectionDirectoryBase):
300             raise llfuse.FUSEError(errno.EPERM)
301
302         if name_new in self:
303             ent = src[name_old]
304             tgt = self[name_new]
305             if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
306                 pass
307             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
308                 if len(tgt) > 0:
309                     raise llfuse.FUSEError(errno.ENOTEMPTY)
310             elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
311                 raise llfuse.FUSEError(errno.ENOTDIR)
312             elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
313                 raise llfuse.FUSEError(errno.EISDIR)
314
315         with llfuse.lock_released:
316             self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
317         self.flush()
318         src.flush()
319
320
321 class CollectionDirectory(CollectionDirectoryBase):
322     """Represents the root of a directory tree representing a collection."""
323
324     def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
325         super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
326         self.api = api
327         self.num_retries = num_retries
328         self.collection_record_file = None
329         self.collection_record = None
330         self.poll = True
331         self.poll_time = 60 * 60
332
333         if isinstance(collection_record, dict):
334             self.collection_locator = collection_record['uuid']
335             self._mtime = convertTime(collection_record.get('modified_at'))
336         else:
337             self.collection_locator = collection_record
338             self._mtime = 0
339         self._manifest_size = 0
340         if self.collection_locator:
341             self._writable = (uuid_pattern.match(self.collection_locator) is not None)
342         self._updating_lock = threading.Lock()
343
344     def same(self, i):
345         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
346
347     def writable(self):
348         return self.collection.writable() if self.collection is not None else self._writable
349
350     # Used by arv-web.py to switch the contents of the CollectionDirectory
351     def change_collection(self, new_locator):
352         """Switch the contents of the CollectionDirectory.
353
354         Must be called with llfuse.lock held.
355         """
356
357         self.collection_locator = new_locator
358         self.collection_record = None
359         self.update()
360
361     def new_collection(self, new_collection_record, coll_reader):
362         if self.inode:
363             self.clear(force=True)
364
365         self.collection_record = new_collection_record
366
367         if self.collection_record:
368             self._mtime = convertTime(self.collection_record.get('modified_at'))
369             self.collection_locator = self.collection_record["uuid"]
370             if self.collection_record_file is not None:
371                 self.collection_record_file.update(self.collection_record)
372
373         self.collection = coll_reader
374         self.populate(self.mtime())
375
376     def uuid(self):
377         return self.collection_locator
378
379     @use_counter
380     def update(self, to_record_version=None):
381         try:
382             if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
383                 return True
384
385             if self.collection_locator is None:
386                 self.fresh()
387                 return True
388
389             try:
390                 with llfuse.lock_released:
391                     self._updating_lock.acquire()
392                     if not self.stale():
393                         return
394
395                     _logger.debug("Updating %s", to_record_version)
396                     if self.collection is not None:
397                         if self.collection.known_past_version(to_record_version):
398                             _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
399                         else:
400                             self.collection.update()
401                     else:
402                         if uuid_pattern.match(self.collection_locator):
403                             coll_reader = arvados.collection.Collection(
404                                 self.collection_locator, self.api, self.api.keep,
405                                 num_retries=self.num_retries)
406                         else:
407                             coll_reader = arvados.collection.CollectionReader(
408                                 self.collection_locator, self.api, self.api.keep,
409                                 num_retries=self.num_retries)
410                         new_collection_record = coll_reader.api_response() or {}
411                         # If the Collection only exists in Keep, there will be no API
412                         # response.  Fill in the fields we need.
413                         if 'uuid' not in new_collection_record:
414                             new_collection_record['uuid'] = self.collection_locator
415                         if "portable_data_hash" not in new_collection_record:
416                             new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
417                         if 'manifest_text' not in new_collection_record:
418                             new_collection_record['manifest_text'] = coll_reader.manifest_text()
419
420                         if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
421                             self.new_collection(new_collection_record, coll_reader)
422
423                         self._manifest_size = len(coll_reader.manifest_text())
424                         _logger.debug("%s manifest_size %i", self, self._manifest_size)
425                 # end with llfuse.lock_released, re-acquire lock
426
427                 self.fresh()
428                 return True
429             finally:
430                 self._updating_lock.release()
431         except arvados.errors.NotFoundError as e:
432             _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
433         except arvados.errors.ArgumentError as detail:
434             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
435             if self.collection_record is not None and "manifest_text" in self.collection_record:
436                 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
437         except Exception:
438             _logger.exception("arv-mount %s: error", self.collection_locator)
439             if self.collection_record is not None and "manifest_text" in self.collection_record:
440                 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
441         return False
442
443     @use_counter
444     @check_update
445     def __getitem__(self, item):
446         if item == '.arvados#collection':
447             if self.collection_record_file is None:
448                 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
449                 self.inodes.add_entry(self.collection_record_file)
450             return self.collection_record_file
451         else:
452             return super(CollectionDirectory, self).__getitem__(item)
453
454     def __contains__(self, k):
455         if k == '.arvados#collection':
456             return True
457         else:
458             return super(CollectionDirectory, self).__contains__(k)
459
460     def invalidate(self):
461         self.collection_record = None
462         self.collection_record_file = None
463         super(CollectionDirectory, self).invalidate()
464
465     def persisted(self):
466         return (self.collection_locator is not None)
467
468     def objsize(self):
469         # This is an empirically-derived heuristic to estimate the memory used
470         # to store this collection's metadata.  Calculating the memory
471         # footprint directly would be more accurate, but also more complicated.
472         return self._manifest_size * 128
473
474     def finalize(self):
475         if self.collection is not None:
476             if self.writable():
477                 self.collection.save()
478             self.collection.stop_threads()
479
480
481 class TmpCollectionDirectory(CollectionDirectoryBase):
482     """A directory backed by an Arvados collection that never gets saved.
483
484     This supports using Keep as scratch space. A userspace program can
485     read the .arvados#collection file to get a current manifest in
486     order to save a snapshot of the scratch data or use it as a crunch
487     job output.
488     """
489
490     class UnsaveableCollection(arvados.collection.Collection):
491         def save(self):
492             pass
493         def save_new(self):
494             pass
495
496     def __init__(self, parent_inode, inodes, api_client, num_retries):
497         collection = self.UnsaveableCollection(
498             api_client=api_client,
499             keep_client=api_client.keep)
500         super(TmpCollectionDirectory, self).__init__(
501             parent_inode, inodes, collection)
502         self.collection_record_file = None
503         self.populate(self.mtime())
504
505     def on_event(self, *args, **kwargs):
506         super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
507         if self.collection_record_file:
508             with llfuse.lock:
509                 self.collection_record_file.invalidate()
510             self.inodes.invalidate_inode(self.collection_record_file.inode)
511             _logger.debug("%s invalidated collection record", self)
512
513     def collection_record(self):
514         with llfuse.lock_released:
515             return {
516                 "uuid": None,
517                 "manifest_text": self.collection.manifest_text(),
518                 "portable_data_hash": self.collection.portable_data_hash(),
519             }
520
521     def __contains__(self, k):
522         return (k == '.arvados#collection' or
523                 super(TmpCollectionDirectory, self).__contains__(k))
524
525     @use_counter
526     def __getitem__(self, item):
527         if item == '.arvados#collection':
528             if self.collection_record_file is None:
529                 self.collection_record_file = FuncToJSONFile(
530                     self.inode, self.collection_record)
531                 self.inodes.add_entry(self.collection_record_file)
532             return self.collection_record_file
533         return super(TmpCollectionDirectory, self).__getitem__(item)
534
535     def persisted(self):
536         return False
537
538     def writable(self):
539         return True
540
541     def finalize(self):
542         self.collection.stop_threads()
543
544     def invalidate(self):
545         if self.collection_record_file:
546             self.collection_record_file.invalidate()
547         super(TmpCollectionDirectory, self).invalidate()
548
549
550 class MagicDirectory(Directory):
551     """A special directory that logically contains the set of all extant keep locators.
552
553     When a file is referenced by lookup(), it is tested to see if it is a valid
554     keep locator to a manifest, and if so, loads the manifest contents as a
555     subdirectory of this directory with the locator as the directory name.
556     Since querying a list of all extant keep locators is impractical, only
557     collections that have already been accessed are visible to readdir().
558
559     """
560
561     README_TEXT = """
562 This directory provides access to Arvados collections as subdirectories listed
563 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
564 the form '1234567890abcdef0123456789abcdef+123').
565
566 Note that this directory will appear empty until you attempt to access a
567 specific collection subdirectory (such as trying to 'cd' into it), at which
568 point the collection will actually be looked up on the server and the directory
569 will appear if it exists.
570
571 """.lstrip()
572
573     def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
574         super(MagicDirectory, self).__init__(parent_inode, inodes)
575         self.api = api
576         self.num_retries = num_retries
577         self.pdh_only = pdh_only
578
579     def __setattr__(self, name, value):
580         super(MagicDirectory, self).__setattr__(name, value)
581         # When we're assigned an inode, add a README.
582         if ((name == 'inode') and (self.inode is not None) and
583               (not self._entries)):
584             self._entries['README'] = self.inodes.add_entry(
585                 StringFile(self.inode, self.README_TEXT, time.time()))
586             # If we're the root directory, add an identical by_id subdirectory.
587             if self.inode == llfuse.ROOT_INODE:
588                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
589                         self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
590
591     def __contains__(self, k):
592         if k in self._entries:
593             return True
594
595         if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
596             return False
597
598         try:
599             e = self.inodes.add_entry(CollectionDirectory(
600                     self.inode, self.inodes, self.api, self.num_retries, k))
601
602             if e.update():
603                 if k not in self._entries:
604                     self._entries[k] = e
605                 else:
606                     self.inodes.del_entry(e)
607                 return True
608             else:
609                 self.inodes.del_entry(e)
610                 return False
611         except Exception as e:
612             _logger.debug('arv-mount exception keep %s', e)
613             self.inodes.del_entry(e)
614             return False
615
616     def __getitem__(self, item):
617         if item in self:
618             return self._entries[item]
619         else:
620             raise KeyError("No collection with id " + item)
621
622     def clear(self, force=False):
623         pass
624
625
626 class RecursiveInvalidateDirectory(Directory):
627     def invalidate(self):
628         try:
629             super(RecursiveInvalidateDirectory, self).invalidate()
630             for a in self._entries:
631                 self._entries[a].invalidate()
632         except Exception:
633             _logger.exception()
634
635
636 class TagsDirectory(RecursiveInvalidateDirectory):
637     """A special directory that contains as subdirectories all tags visible to the user."""
638
639     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
640         super(TagsDirectory, self).__init__(parent_inode, inodes)
641         self.api = api
642         self.num_retries = num_retries
643         self._poll = True
644         self._poll_time = poll_time
645
646     @use_counter
647     def update(self):
648         with llfuse.lock_released:
649             tags = self.api.links().list(
650                 filters=[['link_class', '=', 'tag']],
651                 select=['name'], distinct=True
652                 ).execute(num_retries=self.num_retries)
653         if "items" in tags:
654             self.merge(tags['items'],
655                        lambda i: i['name'],
656                        lambda a, i: a.tag == i['name'],
657                        lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
658
659
660 class TagDirectory(Directory):
661     """A special directory that contains as subdirectories all collections visible
662     to the user that are tagged with a particular tag.
663     """
664
665     def __init__(self, parent_inode, inodes, api, num_retries, tag,
666                  poll=False, poll_time=60):
667         super(TagDirectory, self).__init__(parent_inode, inodes)
668         self.api = api
669         self.num_retries = num_retries
670         self.tag = tag
671         self._poll = poll
672         self._poll_time = poll_time
673
674     @use_counter
675     def update(self):
676         with llfuse.lock_released:
677             taggedcollections = self.api.links().list(
678                 filters=[['link_class', '=', 'tag'],
679                          ['name', '=', self.tag],
680                          ['head_uuid', 'is_a', 'arvados#collection']],
681                 select=['head_uuid']
682                 ).execute(num_retries=self.num_retries)
683         self.merge(taggedcollections['items'],
684                    lambda i: i['head_uuid'],
685                    lambda a, i: a.collection_locator == i['head_uuid'],
686                    lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
687
688
689 class ProjectDirectory(Directory):
690     """A special directory that contains the contents of a project."""
691
692     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
693                  poll=False, poll_time=60):
694         super(ProjectDirectory, self).__init__(parent_inode, inodes)
695         self.api = api
696         self.num_retries = num_retries
697         self.project_object = project_object
698         self.project_object_file = None
699         self.project_uuid = project_object['uuid']
700         self._poll = poll
701         self._poll_time = poll_time
702         self._updating_lock = threading.Lock()
703         self._current_user = None
704
705     def createDirectory(self, i):
706         if collection_uuid_pattern.match(i['uuid']):
707             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
708         elif group_uuid_pattern.match(i['uuid']):
709             return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
710         elif link_uuid_pattern.match(i['uuid']):
711             if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
712                 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
713             else:
714                 return None
715         elif uuid_pattern.match(i['uuid']):
716             return ObjectFile(self.parent_inode, i)
717         else:
718             return None
719
720     def uuid(self):
721         return self.project_uuid
722
723     @use_counter
724     def update(self):
725         if self.project_object_file == None:
726             self.project_object_file = ObjectFile(self.inode, self.project_object)
727             self.inodes.add_entry(self.project_object_file)
728
729         def namefn(i):
730             if 'name' in i:
731                 if i['name'] is None or len(i['name']) == 0:
732                     return None
733                 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
734                     # collection or subproject
735                     return i['name']
736                 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
737                     # name link
738                     return i['name']
739                 elif 'kind' in i and i['kind'].startswith('arvados#'):
740                     # something else
741                     return "{}.{}".format(i['name'], i['kind'][8:])
742             else:
743                 return None
744
745         def samefn(a, i):
746             if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
747                 return a.uuid() == i['uuid']
748             elif isinstance(a, ObjectFile):
749                 return a.uuid() == i['uuid'] and not a.stale()
750             return False
751
752         try:
753             with llfuse.lock_released:
754                 self._updating_lock.acquire()
755                 if not self.stale():
756                     return
757
758                 if group_uuid_pattern.match(self.project_uuid):
759                     self.project_object = self.api.groups().get(
760                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
761                 elif user_uuid_pattern.match(self.project_uuid):
762                     self.project_object = self.api.users().get(
763                         uuid=self.project_uuid).execute(num_retries=self.num_retries)
764
765                 contents = arvados.util.list_all(self.api.groups().contents,
766                                                  self.num_retries, uuid=self.project_uuid)
767
768             # end with llfuse.lock_released, re-acquire lock
769
770             self.merge(contents,
771                        namefn,
772                        samefn,
773                        self.createDirectory)
774         finally:
775             self._updating_lock.release()
776
777     @use_counter
778     @check_update
779     def __getitem__(self, item):
780         if item == '.arvados#project':
781             return self.project_object_file
782         else:
783             return super(ProjectDirectory, self).__getitem__(item)
784
785     def __contains__(self, k):
786         if k == '.arvados#project':
787             return True
788         else:
789             return super(ProjectDirectory, self).__contains__(k)
790
791     @use_counter
792     @check_update
793     def writable(self):
794         with llfuse.lock_released:
795             if not self._current_user:
796                 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
797             return self._current_user["uuid"] in self.project_object["writable_by"]
798
799     def persisted(self):
800         return True
801
802     @use_counter
803     @check_update
804     def mkdir(self, name):
805         try:
806             with llfuse.lock_released:
807                 self.api.collections().create(body={"owner_uuid": self.project_uuid,
808                                                     "name": name,
809                                                     "manifest_text": ""}).execute(num_retries=self.num_retries)
810             self.invalidate()
811         except apiclient_errors.Error as error:
812             _logger.error(error)
813             raise llfuse.FUSEError(errno.EEXIST)
814
815     @use_counter
816     @check_update
817     def rmdir(self, name):
818         if name not in self:
819             raise llfuse.FUSEError(errno.ENOENT)
820         if not isinstance(self[name], CollectionDirectory):
821             raise llfuse.FUSEError(errno.EPERM)
822         if len(self[name]) > 0:
823             raise llfuse.FUSEError(errno.ENOTEMPTY)
824         with llfuse.lock_released:
825             self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
826         self.invalidate()
827
828     @use_counter
829     @check_update
830     def rename(self, name_old, name_new, src):
831         if not isinstance(src, ProjectDirectory):
832             raise llfuse.FUSEError(errno.EPERM)
833
834         ent = src[name_old]
835
836         if not isinstance(ent, CollectionDirectory):
837             raise llfuse.FUSEError(errno.EPERM)
838
839         if name_new in self:
840             # POSIX semantics for replacing one directory with another is
841             # tricky (the target directory must be empty, the operation must be
842             # atomic which isn't possible with the Arvados API as of this
843             # writing) so don't support that.
844             raise llfuse.FUSEError(errno.EPERM)
845
846         self.api.collections().update(uuid=ent.uuid(),
847                                       body={"owner_uuid": self.uuid(),
848                                             "name": name_new}).execute(num_retries=self.num_retries)
849
850         # Acually move the entry from source directory to this directory.
851         del src._entries[name_old]
852         self._entries[name_new] = ent
853         self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
854
855
856 class SharedDirectory(Directory):
857     """A special directory that represents users or groups who have shared projects with me."""
858
859     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
860                  poll=False, poll_time=60):
861         super(SharedDirectory, self).__init__(parent_inode, inodes)
862         self.api = api
863         self.num_retries = num_retries
864         self.current_user = api.users().current().execute(num_retries=num_retries)
865         self._poll = True
866         self._poll_time = poll_time
867
868     @use_counter
869     def update(self):
870         with llfuse.lock_released:
871             all_projects = arvados.util.list_all(
872                 self.api.groups().list, self.num_retries,
873                 filters=[['group_class','=','project']])
874             objects = {}
875             for ob in all_projects:
876                 objects[ob['uuid']] = ob
877
878             roots = []
879             root_owners = {}
880             for ob in all_projects:
881                 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
882                     roots.append(ob)
883                     root_owners[ob['owner_uuid']] = True
884
885             lusers = arvados.util.list_all(
886                 self.api.users().list, self.num_retries,
887                 filters=[['uuid','in', list(root_owners)]])
888             lgroups = arvados.util.list_all(
889                 self.api.groups().list, self.num_retries,
890                 filters=[['uuid','in', list(root_owners)]])
891
892             users = {}
893             groups = {}
894
895             for l in lusers:
896                 objects[l["uuid"]] = l
897             for l in lgroups:
898                 objects[l["uuid"]] = l
899
900             contents = {}
901             for r in root_owners:
902                 if r in objects:
903                     obr = objects[r]
904                     if obr.get("name"):
905                         contents[obr["name"]] = obr
906                     #elif obr.get("username"):
907                     #    contents[obr["username"]] = obr
908                     elif "first_name" in obr:
909                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
910
911
912             for r in roots:
913                 if r['owner_uuid'] not in objects:
914                     contents[r['name']] = r
915
916         # end with llfuse.lock_released, re-acquire lock
917
918         try:
919             self.merge(contents.items(),
920                        lambda i: i[0],
921                        lambda a, i: a.uuid() == i[1]['uuid'],
922                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
923         except Exception:
924             _logger.exception()