9 from apiclient import errors as apiclient_errors
12 from fusefile import StringFile, ObjectFile, FuseArvadosFile
13 from fresh import FreshBase, convertTime, use_counter, check_update
15 import arvados.collection
16 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
18 _logger = logging.getLogger('arvados.arvados_fuse')
21 # Match any character which FUSE or Linux cannot accommodate as part
22 # of a filename. (If present in a collection filename, they will
23 # appear as underscores in the fuse mount.)
24 _disallowed_filename_characters = re.compile('[\x00/]')
26 def sanitize_filename(dirty):
27 """Replace disallowed filename characters with harmless "_"."""
37 return _disallowed_filename_characters.sub('_', dirty)
40 class Directory(FreshBase):
41 """Generic directory object, backed by a dict.
43 Consists of a set of entries with the key representing the filename
44 and the value referencing a File or Directory object.
47 def __init__(self, parent_inode, inodes):
48 """parent_inode is the integer inode number"""
50 super(Directory, self).__init__()
53 if not isinstance(parent_inode, int):
54 raise Exception("parent_inode should be an int")
55 self.parent_inode = parent_inode
58 self._mtime = time.time()
60 # Overriden by subclasses to implement logic to update the entries dict
61 # when the directory is stale
66 # Only used when computing the size of the disk footprint of the directory
74 def checkupdate(self):
78 except apiclient.errors.HttpError as e:
83 def __getitem__(self, item):
84 return self._entries[item]
89 return list(self._entries.items())
93 def __contains__(self, k):
94 return k in self._entries
99 return len(self._entries)
102 self.inodes.touch(self)
103 super(Directory, self).fresh()
105 def merge(self, items, fn, same, new_entry):
106 """Helper method for updating the contents of the directory.
108 Takes a list describing the new contents of the directory, reuse
109 entries that are the same in both the old and new lists, create new
110 entries, and delete old entries missing from the new list.
112 :items: iterable with new directory contents
114 :fn: function to take an entry in 'items' and return the desired file or
115 directory name, or None if this entry should be skipped
117 :same: function to compare an existing entry (a File or Directory
118 object) with an entry in the items list to determine whether to keep
121 :new_entry: function to create a new directory entry (File or Directory
122 object) from an entry in the items list.
126 oldentries = self._entries
130 name = sanitize_filename(fn(i))
132 if name in oldentries and same(oldentries[name], i):
133 # move existing directory entry over
134 self._entries[name] = oldentries[name]
137 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
138 # create new directory entry
141 self._entries[name] = self.inodes.add_entry(ent)
144 # delete any other directory entries that were not in found in 'items'
146 _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
147 self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
148 self.inodes.del_entry(oldentries[i])
152 self.inodes.invalidate_inode(self.inode)
153 self._mtime = time.time()
157 def clear(self, force=False):
158 """Delete all entries"""
160 if not self.in_use() or force:
161 oldentries = self._entries
164 if not oldentries[n].clear(force):
165 self._entries = oldentries
168 self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
169 self.inodes.del_entry(oldentries[n])
170 self.inodes.invalidate_inode(self.inode)
185 def create(self, name):
186 raise NotImplementedError()
188 def mkdir(self, name):
189 raise NotImplementedError()
191 def unlink(self, name):
192 raise NotImplementedError()
194 def rmdir(self, name):
195 raise NotImplementedError()
197 def rename(self, name_old, name_new, src):
198 raise NotImplementedError()
201 class CollectionDirectoryBase(Directory):
202 """Represent an Arvados Collection as a directory.
204 This class is used for Subcollections, and is also the base class for
205 CollectionDirectory, which implements collection loading/saving on
208 Most operations act only the underlying Arvados `Collection` object. The
209 `Collection` object signals via a notify callback to
210 `CollectionDirectoryBase.on_event` that an item was added, removed or
211 modified. FUSE inodes and directory entries are created, deleted or
212 invalidated in response to these events.
216 def __init__(self, parent_inode, inodes, collection):
217 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
218 self.collection = collection
220 def new_entry(self, name, item, mtime):
221 name = sanitize_filename(name)
222 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
223 if item.fuse_entry.dead is not True:
224 raise Exception("Can only reparent dead inode entry")
225 if item.fuse_entry.inode is None:
226 raise Exception("Reparented entry must still have valid inode")
227 item.fuse_entry.dead = False
228 self._entries[name] = item.fuse_entry
229 elif isinstance(item, arvados.collection.RichCollectionBase):
230 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
231 self._entries[name].populate(mtime)
233 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
234 item.fuse_entry = self._entries[name]
236 def on_event(self, event, collection, name, item):
237 if collection == self.collection:
238 name = sanitize_filename(name)
239 _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
241 if event == arvados.collection.ADD:
242 self.new_entry(name, item, self.mtime())
243 elif event == arvados.collection.DEL:
244 ent = self._entries[name]
245 del self._entries[name]
246 self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
247 self.inodes.del_entry(ent)
248 elif event == arvados.collection.MOD:
249 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
250 self.inodes.invalidate_inode(item.fuse_entry.inode)
251 elif name in self._entries:
252 self.inodes.invalidate_inode(self._entries[name].inode)
254 def populate(self, mtime):
256 self.collection.subscribe(self.on_event)
257 for entry, item in self.collection.items():
258 self.new_entry(entry, item, self.mtime())
261 return self.collection.writable()
265 with llfuse.lock_released:
266 self.collection.root_collection().save()
270 def create(self, name):
271 with llfuse.lock_released:
272 self.collection.open(name, "w").close()
276 def mkdir(self, name):
277 with llfuse.lock_released:
278 self.collection.mkdirs(name)
282 def unlink(self, name):
283 with llfuse.lock_released:
284 self.collection.remove(name)
289 def rmdir(self, name):
290 with llfuse.lock_released:
291 self.collection.remove(name)
296 def rename(self, name_old, name_new, src):
297 if not isinstance(src, CollectionDirectoryBase):
298 raise llfuse.FUSEError(errno.EPERM)
303 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
305 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
307 raise llfuse.FUSEError(errno.ENOTEMPTY)
308 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
309 raise llfuse.FUSEError(errno.ENOTDIR)
310 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
311 raise llfuse.FUSEError(errno.EISDIR)
313 with llfuse.lock_released:
314 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
319 class CollectionDirectory(CollectionDirectoryBase):
320 """Represents the root of a directory tree representing a collection."""
322 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
323 super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
325 self.num_retries = num_retries
326 self.collection_record_file = None
327 self.collection_record = None
328 if isinstance(collection_record, dict):
329 self.collection_locator = collection_record['uuid']
330 self._mtime = convertTime(collection_record.get('modified_at'))
332 self.collection_locator = collection_record
334 self._manifest_size = 0
335 if self.collection_locator:
336 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
337 self._updating_lock = threading.Lock()
340 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
343 return self.collection.writable() if self.collection is not None else self._writable
345 # Used by arv-web.py to switch the contents of the CollectionDirectory
346 def change_collection(self, new_locator):
347 """Switch the contents of the CollectionDirectory.
349 Must be called with llfuse.lock held.
352 self.collection_locator = new_locator
353 self.collection_record = None
356 def new_collection(self, new_collection_record, coll_reader):
358 self.clear(force=True)
360 self.collection_record = new_collection_record
362 if self.collection_record:
363 self._mtime = convertTime(self.collection_record.get('modified_at'))
364 self.collection_locator = self.collection_record["uuid"]
365 if self.collection_record_file is not None:
366 self.collection_record_file.update(self.collection_record)
368 self.collection = coll_reader
369 self.populate(self.mtime())
372 return self.collection_locator
375 def update(self, to_record_version=None):
377 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
380 if self.collection_locator is None:
385 with llfuse.lock_released:
386 self._updating_lock.acquire()
390 _logger.debug("Updating %s", to_record_version)
391 if self.collection is not None:
392 if self.collection.known_past_version(to_record_version):
393 _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
395 self.collection.update()
397 if uuid_pattern.match(self.collection_locator):
398 coll_reader = arvados.collection.Collection(
399 self.collection_locator, self.api, self.api.keep,
400 num_retries=self.num_retries)
402 coll_reader = arvados.collection.CollectionReader(
403 self.collection_locator, self.api, self.api.keep,
404 num_retries=self.num_retries)
405 new_collection_record = coll_reader.api_response() or {}
406 # If the Collection only exists in Keep, there will be no API
407 # response. Fill in the fields we need.
408 if 'uuid' not in new_collection_record:
409 new_collection_record['uuid'] = self.collection_locator
410 if "portable_data_hash" not in new_collection_record:
411 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
412 if 'manifest_text' not in new_collection_record:
413 new_collection_record['manifest_text'] = coll_reader.manifest_text()
415 if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
416 self.new_collection(new_collection_record, coll_reader)
418 self._manifest_size = len(coll_reader.manifest_text())
419 _logger.debug("%s manifest_size %i", self, self._manifest_size)
420 # end with llfuse.lock_released, re-acquire lock
425 self._updating_lock.release()
426 except arvados.errors.NotFoundError as e:
427 _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
428 except arvados.errors.ArgumentError as detail:
429 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
430 if self.collection_record is not None and "manifest_text" in self.collection_record:
431 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
433 _logger.exception("arv-mount %s: error", self.collection_locator)
434 if self.collection_record is not None and "manifest_text" in self.collection_record:
435 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
440 def __getitem__(self, item):
441 if item == '.arvados#collection':
442 if self.collection_record_file is None:
443 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
444 self.inodes.add_entry(self.collection_record_file)
445 return self.collection_record_file
447 return super(CollectionDirectory, self).__getitem__(item)
449 def __contains__(self, k):
450 if k == '.arvados#collection':
453 return super(CollectionDirectory, self).__contains__(k)
455 def invalidate(self):
456 self.collection_record = None
457 self.collection_record_file = None
458 super(CollectionDirectory, self).invalidate()
461 return (self.collection_locator is not None)
464 # This is an empirically-derived heuristic to estimate the memory used
465 # to store this collection's metadata. Calculating the memory
466 # footprint directly would be more accurate, but also more complicated.
467 return self._manifest_size * 128
470 if self.collection is not None:
472 self.collection.save()
473 self.collection.stop_threads()
476 class MagicDirectory(Directory):
477 """A special directory that logically contains the set of all extant keep locators.
479 When a file is referenced by lookup(), it is tested to see if it is a valid
480 keep locator to a manifest, and if so, loads the manifest contents as a
481 subdirectory of this directory with the locator as the directory name.
482 Since querying a list of all extant keep locators is impractical, only
483 collections that have already been accessed are visible to readdir().
488 This directory provides access to Arvados collections as subdirectories listed
489 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
490 the form '1234567890abcdefghijklmnopqrstuv+123').
492 Note that this directory will appear empty until you attempt to access a
493 specific collection subdirectory (such as trying to 'cd' into it), at which
494 point the collection will actually be looked up on the server and the directory
495 will appear if it exists.
498 def __init__(self, parent_inode, inodes, api, num_retries):
499 super(MagicDirectory, self).__init__(parent_inode, inodes)
501 self.num_retries = num_retries
503 def __setattr__(self, name, value):
504 super(MagicDirectory, self).__setattr__(name, value)
505 # When we're assigned an inode, add a README.
506 if ((name == 'inode') and (self.inode is not None) and
507 (not self._entries)):
508 self._entries['README'] = self.inodes.add_entry(
509 StringFile(self.inode, self.README_TEXT, time.time()))
510 # If we're the root directory, add an identical by_id subdirectory.
511 if self.inode == llfuse.ROOT_INODE:
512 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
513 self.inode, self.inodes, self.api, self.num_retries))
515 def __contains__(self, k):
516 if k in self._entries:
519 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
523 e = self.inodes.add_entry(CollectionDirectory(
524 self.inode, self.inodes, self.api, self.num_retries, k))
527 if k not in self._entries:
530 self.inodes.del_entry(e)
533 self.inodes.del_entry(e)
535 except Exception as e:
536 _logger.debug('arv-mount exception keep %s', e)
537 self.inodes.del_entry(e)
540 def __getitem__(self, item):
542 return self._entries[item]
544 raise KeyError("No collection with id " + item)
546 def clear(self, force=False):
550 class RecursiveInvalidateDirectory(Directory):
551 def invalidate(self):
553 super(RecursiveInvalidateDirectory, self).invalidate()
554 for a in self._entries:
555 self._entries[a].invalidate()
560 class TagsDirectory(RecursiveInvalidateDirectory):
561 """A special directory that contains as subdirectories all tags visible to the user."""
563 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
564 super(TagsDirectory, self).__init__(parent_inode, inodes)
566 self.num_retries = num_retries
568 self._poll_time = poll_time
572 with llfuse.lock_released:
573 tags = self.api.links().list(
574 filters=[['link_class', '=', 'tag']],
575 select=['name'], distinct=True
576 ).execute(num_retries=self.num_retries)
578 self.merge(tags['items'],
580 lambda a, i: a.tag == i['name'],
581 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
584 class TagDirectory(Directory):
585 """A special directory that contains as subdirectories all collections visible
586 to the user that are tagged with a particular tag.
589 def __init__(self, parent_inode, inodes, api, num_retries, tag,
590 poll=False, poll_time=60):
591 super(TagDirectory, self).__init__(parent_inode, inodes)
593 self.num_retries = num_retries
596 self._poll_time = poll_time
600 with llfuse.lock_released:
601 taggedcollections = self.api.links().list(
602 filters=[['link_class', '=', 'tag'],
603 ['name', '=', self.tag],
604 ['head_uuid', 'is_a', 'arvados#collection']],
606 ).execute(num_retries=self.num_retries)
607 self.merge(taggedcollections['items'],
608 lambda i: i['head_uuid'],
609 lambda a, i: a.collection_locator == i['head_uuid'],
610 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
613 class ProjectDirectory(Directory):
614 """A special directory that contains the contents of a project."""
616 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
617 poll=False, poll_time=60):
618 super(ProjectDirectory, self).__init__(parent_inode, inodes)
620 self.num_retries = num_retries
621 self.project_object = project_object
622 self.project_object_file = None
623 self.project_uuid = project_object['uuid']
625 self._poll_time = poll_time
626 self._updating_lock = threading.Lock()
627 self._current_user = None
629 def createDirectory(self, i):
630 if collection_uuid_pattern.match(i['uuid']):
631 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
632 elif group_uuid_pattern.match(i['uuid']):
633 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
634 elif link_uuid_pattern.match(i['uuid']):
635 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
636 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
639 elif uuid_pattern.match(i['uuid']):
640 return ObjectFile(self.parent_inode, i)
645 return self.project_uuid
649 if self.project_object_file == None:
650 self.project_object_file = ObjectFile(self.inode, self.project_object)
651 self.inodes.add_entry(self.project_object_file)
655 if i['name'] is None or len(i['name']) == 0:
657 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
658 # collection or subproject
660 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
663 elif 'kind' in i and i['kind'].startswith('arvados#'):
665 return "{}.{}".format(i['name'], i['kind'][8:])
670 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
671 return a.uuid() == i['uuid']
672 elif isinstance(a, ObjectFile):
673 return a.uuid() == i['uuid'] and not a.stale()
677 with llfuse.lock_released:
678 self._updating_lock.acquire()
682 if group_uuid_pattern.match(self.project_uuid):
683 self.project_object = self.api.groups().get(
684 uuid=self.project_uuid).execute(num_retries=self.num_retries)
685 elif user_uuid_pattern.match(self.project_uuid):
686 self.project_object = self.api.users().get(
687 uuid=self.project_uuid).execute(num_retries=self.num_retries)
689 contents = arvados.util.list_all(self.api.groups().contents,
690 self.num_retries, uuid=self.project_uuid)
692 # end with llfuse.lock_released, re-acquire lock
697 self.createDirectory)
699 self._updating_lock.release()
703 def __getitem__(self, item):
704 if item == '.arvados#project':
705 return self.project_object_file
707 return super(ProjectDirectory, self).__getitem__(item)
709 def __contains__(self, k):
710 if k == '.arvados#project':
713 return super(ProjectDirectory, self).__contains__(k)
718 with llfuse.lock_released:
719 if not self._current_user:
720 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
721 return self._current_user["uuid"] in self.project_object["writable_by"]
728 def mkdir(self, name):
730 with llfuse.lock_released:
731 self.api.collections().create(body={"owner_uuid": self.project_uuid,
733 "manifest_text": ""}).execute(num_retries=self.num_retries)
735 except apiclient_errors.Error as error:
737 raise llfuse.FUSEError(errno.EEXIST)
741 def rmdir(self, name):
743 raise llfuse.FUSEError(errno.ENOENT)
744 if not isinstance(self[name], CollectionDirectory):
745 raise llfuse.FUSEError(errno.EPERM)
746 if len(self[name]) > 0:
747 raise llfuse.FUSEError(errno.ENOTEMPTY)
748 with llfuse.lock_released:
749 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
754 def rename(self, name_old, name_new, src):
755 if not isinstance(src, ProjectDirectory):
756 raise llfuse.FUSEError(errno.EPERM)
760 if not isinstance(ent, CollectionDirectory):
761 raise llfuse.FUSEError(errno.EPERM)
764 # POSIX semantics for replacing one directory with another is
765 # tricky (the target directory must be empty, the operation must be
766 # atomic which isn't possible with the Arvados API as of this
767 # writing) so don't support that.
768 raise llfuse.FUSEError(errno.EPERM)
770 self.api.collections().update(uuid=ent.uuid(),
771 body={"owner_uuid": self.uuid(),
772 "name": name_new}).execute(num_retries=self.num_retries)
774 # Acually move the entry from source directory to this directory.
775 del src._entries[name_old]
776 self._entries[name_new] = ent
777 self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
780 class SharedDirectory(Directory):
781 """A special directory that represents users or groups who have shared projects with me."""
783 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
784 poll=False, poll_time=60):
785 super(SharedDirectory, self).__init__(parent_inode, inodes)
787 self.num_retries = num_retries
788 self.current_user = api.users().current().execute(num_retries=num_retries)
790 self._poll_time = poll_time
794 with llfuse.lock_released:
795 all_projects = arvados.util.list_all(
796 self.api.groups().list, self.num_retries,
797 filters=[['group_class','=','project']])
799 for ob in all_projects:
800 objects[ob['uuid']] = ob
804 for ob in all_projects:
805 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
807 root_owners[ob['owner_uuid']] = True
809 lusers = arvados.util.list_all(
810 self.api.users().list, self.num_retries,
811 filters=[['uuid','in', list(root_owners)]])
812 lgroups = arvados.util.list_all(
813 self.api.groups().list, self.num_retries,
814 filters=[['uuid','in', list(root_owners)]])
820 objects[l["uuid"]] = l
822 objects[l["uuid"]] = l
825 for r in root_owners:
829 contents[obr["name"]] = obr
830 #elif obr.get("username"):
831 # contents[obr["username"]] = obr
832 elif "first_name" in obr:
833 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
837 if r['owner_uuid'] not in objects:
838 contents[r['name']] = r
840 # end with llfuse.lock_released, re-acquire lock
843 self.merge(contents.items(),
845 lambda a, i: a.uuid() == i[1]['uuid'],
846 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))