9 from apiclient import errors as apiclient_errors
12 from fusefile import StringFile, ObjectFile, FuseArvadosFile
13 from fresh import FreshBase, convertTime, use_counter, check_update
15 import arvados.collection
16 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
18 _logger = logging.getLogger('arvados.arvados_fuse')
21 # Match any character which FUSE or Linux cannot accommodate as part
22 # of a filename. (If present in a collection filename, they will
23 # appear as underscores in the fuse mount.)
24 _disallowed_filename_characters = re.compile('[\x00/]')
26 def sanitize_filename(dirty):
27 """Replace disallowed filename characters with harmless "_"."""
37 return _disallowed_filename_characters.sub('_', dirty)
40 class Directory(FreshBase):
41 """Generic directory object, backed by a dict.
43 Consists of a set of entries with the key representing the filename
44 and the value referencing a File or Directory object.
47 def __init__(self, parent_inode, inodes):
48 """parent_inode is the integer inode number"""
50 super(Directory, self).__init__()
53 if not isinstance(parent_inode, int):
54 raise Exception("parent_inode should be an int")
55 self.parent_inode = parent_inode
58 self._mtime = time.time()
60 # Overriden by subclasses to implement logic to update the entries dict
61 # when the directory is stale
66 # Only used when computing the size of the disk footprint of the directory
74 def checkupdate(self):
78 except apiclient.errors.HttpError as e:
83 def __getitem__(self, item):
84 return self._entries[item]
89 return list(self._entries.items())
93 def __contains__(self, k):
94 return k in self._entries
99 return len(self._entries)
102 self.inodes.touch(self)
103 super(Directory, self).fresh()
105 def merge(self, items, fn, same, new_entry):
106 """Helper method for updating the contents of the directory.
108 Takes a list describing the new contents of the directory, reuse
109 entries that are the same in both the old and new lists, create new
110 entries, and delete old entries missing from the new list.
112 :items: iterable with new directory contents
114 :fn: function to take an entry in 'items' and return the desired file or
115 directory name, or None if this entry should be skipped
117 :same: function to compare an existing entry (a File or Directory
118 object) with an entry in the items list to determine whether to keep
121 :new_entry: function to create a new directory entry (File or Directory
122 object) from an entry in the items list.
126 oldentries = self._entries
130 name = sanitize_filename(fn(i))
132 if name in oldentries and same(oldentries[name], i):
133 # move existing directory entry over
134 self._entries[name] = oldentries[name]
137 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
138 # create new directory entry
141 self._entries[name] = self.inodes.add_entry(ent)
144 # delete any other directory entries that were not in found in 'items'
146 _logger.debug("Forgetting about entry '%s' on inode %i", str(i), self.inode)
147 llfuse.invalidate_entry(self.inode, str(i))
148 self.inodes.del_entry(oldentries[i])
152 llfuse.invalidate_inode(self.inode)
153 self._mtime = time.time()
157 def clear(self, force=False):
158 """Delete all entries"""
160 if not self.in_use() or force:
161 oldentries = self._entries
164 if not oldentries[n].clear(force):
165 self._entries = oldentries
168 llfuse.invalidate_entry(self.inode, str(n))
169 self.inodes.del_entry(oldentries[n])
170 llfuse.invalidate_inode(self.inode)
185 def create(self, name):
186 raise NotImplementedError()
188 def mkdir(self, name):
189 raise NotImplementedError()
191 def unlink(self, name):
192 raise NotImplementedError()
194 def rmdir(self, name):
195 raise NotImplementedError()
197 def rename(self, name_old, name_new, src):
198 raise NotImplementedError()
201 class CollectionDirectoryBase(Directory):
202 """Represent an Arvados Collection as a directory.
204 This class is used for Subcollections, and is also the base class for
205 CollectionDirectory, which implements collection loading/saving on
208 Most operations act only the underlying Arvados `Collection` object. The
209 `Collection` object signals via a notify callback to
210 `CollectionDirectoryBase.on_event` that an item was added, removed or
211 modified. FUSE inodes and directory entries are created, deleted or
212 invalidated in response to these events.
216 def __init__(self, parent_inode, inodes, collection):
217 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
218 self.collection = collection
220 def new_entry(self, name, item, mtime):
221 name = sanitize_filename(name)
222 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
223 if item.fuse_entry.dead is not True:
224 raise Exception("Can only reparent dead inode entry")
225 if item.fuse_entry.inode is None:
226 raise Exception("Reparented entry must still have valid inode")
227 item.fuse_entry.dead = False
228 self._entries[name] = item.fuse_entry
229 elif isinstance(item, arvados.collection.RichCollectionBase):
230 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
231 self._entries[name].populate(mtime)
233 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
234 item.fuse_entry = self._entries[name]
236 def on_event(self, event, collection, name, item):
237 if collection == self.collection:
238 _logger.debug("%s %s %s %s", event, collection, name, item)
240 if event == arvados.collection.ADD:
241 self.new_entry(name, item, self.mtime())
242 elif event == arvados.collection.DEL:
243 ent = self._entries[name]
244 del self._entries[name]
245 llfuse.invalidate_entry(self.inode, name)
246 self.inodes.del_entry(ent)
247 elif event == arvados.collection.MOD:
248 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
249 llfuse.invalidate_inode(item.fuse_entry.inode)
250 elif name in self._entries:
251 llfuse.invalidate_inode(self._entries[name].inode)
253 def populate(self, mtime):
255 self.collection.subscribe(self.on_event)
256 for entry, item in self.collection.items():
257 self.new_entry(entry, item, self.mtime())
260 return self.collection.writable()
264 with llfuse.lock_released:
265 self.collection.root_collection().save()
269 def create(self, name):
270 with llfuse.lock_released:
271 self.collection.open(name, "w").close()
275 def mkdir(self, name):
276 with llfuse.lock_released:
277 self.collection.mkdirs(name)
281 def unlink(self, name):
282 with llfuse.lock_released:
283 self.collection.remove(name)
288 def rmdir(self, name):
289 with llfuse.lock_released:
290 self.collection.remove(name)
295 def rename(self, name_old, name_new, src):
296 if not isinstance(src, CollectionDirectoryBase):
297 raise llfuse.FUSEError(errno.EPERM)
302 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
304 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
306 raise llfuse.FUSEError(errno.ENOTEMPTY)
307 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
308 raise llfuse.FUSEError(errno.ENOTDIR)
309 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
310 raise llfuse.FUSEError(errno.EISDIR)
312 with llfuse.lock_released:
313 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
318 class CollectionDirectory(CollectionDirectoryBase):
319 """Represents the root of a directory tree representing a collection."""
321 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
322 super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
324 self.num_retries = num_retries
325 self.collection_record_file = None
326 self.collection_record = None
327 if isinstance(collection_record, dict):
328 self.collection_locator = collection_record['uuid']
329 self._mtime = convertTime(collection_record.get('modified_at'))
331 self.collection_locator = collection_record
333 self._manifest_size = 0
334 if self.collection_locator:
335 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
336 self._updating_lock = threading.Lock()
339 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
342 return self.collection.writable() if self.collection is not None else self._writable
344 # Used by arv-web.py to switch the contents of the CollectionDirectory
345 def change_collection(self, new_locator):
346 """Switch the contents of the CollectionDirectory.
348 Must be called with llfuse.lock held.
351 self.collection_locator = new_locator
352 self.collection_record = None
355 def new_collection(self, new_collection_record, coll_reader):
357 self.clear(force=True)
359 self.collection_record = new_collection_record
361 if self.collection_record:
362 self._mtime = convertTime(self.collection_record.get('modified_at'))
363 self.collection_locator = self.collection_record["uuid"]
364 if self.collection_record_file is not None:
365 self.collection_record_file.update(self.collection_record)
367 self.collection = coll_reader
368 self.populate(self.mtime())
371 return self.collection_locator
374 def update(self, to_pdh=None):
376 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
379 if self.collection_locator is None:
384 with llfuse.lock_released:
385 self._updating_lock.acquire()
389 _logger.debug("Updating %s", self.collection_locator)
391 if self.collection.portable_data_hash() == to_pdh:
392 _logger.debug("%s is fresh at pdh '%s'", self.collection_locator, to_pdh)
394 self.collection.update()
396 if uuid_pattern.match(self.collection_locator):
397 coll_reader = arvados.collection.Collection(
398 self.collection_locator, self.api, self.api.keep,
399 num_retries=self.num_retries)
401 coll_reader = arvados.collection.CollectionReader(
402 self.collection_locator, self.api, self.api.keep,
403 num_retries=self.num_retries)
404 new_collection_record = coll_reader.api_response() or {}
405 # If the Collection only exists in Keep, there will be no API
406 # response. Fill in the fields we need.
407 if 'uuid' not in new_collection_record:
408 new_collection_record['uuid'] = self.collection_locator
409 if "portable_data_hash" not in new_collection_record:
410 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
411 if 'manifest_text' not in new_collection_record:
412 new_collection_record['manifest_text'] = coll_reader.manifest_text()
414 if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
415 self.new_collection(new_collection_record, coll_reader)
417 self._manifest_size = len(coll_reader.manifest_text())
418 _logger.debug("%s manifest_size %i", self, self._manifest_size)
419 # end with llfuse.lock_released, re-acquire lock
424 self._updating_lock.release()
425 except arvados.errors.NotFoundError:
426 _logger.exception("arv-mount %s: error", self.collection_locator)
427 except arvados.errors.ArgumentError as detail:
428 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
429 if self.collection_record is not None and "manifest_text" in self.collection_record:
430 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
432 _logger.exception("arv-mount %s: error", self.collection_locator)
433 if self.collection_record is not None and "manifest_text" in self.collection_record:
434 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
439 def __getitem__(self, item):
440 if item == '.arvados#collection':
441 if self.collection_record_file is None:
442 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
443 self.inodes.add_entry(self.collection_record_file)
444 return self.collection_record_file
446 return super(CollectionDirectory, self).__getitem__(item)
448 def __contains__(self, k):
449 if k == '.arvados#collection':
452 return super(CollectionDirectory, self).__contains__(k)
454 def invalidate(self):
455 self.collection_record = None
456 self.collection_record_file = None
457 super(CollectionDirectory, self).invalidate()
460 return (self.collection_locator is not None)
463 # This is an empirically-derived heuristic to estimate the memory used
464 # to store this collection's metadata. Calculating the memory
465 # footprint directly would be more accurate, but also more complicated.
466 return self._manifest_size * 128
469 class MagicDirectory(Directory):
470 """A special directory that logically contains the set of all extant keep locators.
472 When a file is referenced by lookup(), it is tested to see if it is a valid
473 keep locator to a manifest, and if so, loads the manifest contents as a
474 subdirectory of this directory with the locator as the directory name.
475 Since querying a list of all extant keep locators is impractical, only
476 collections that have already been accessed are visible to readdir().
481 This directory provides access to Arvados collections as subdirectories listed
482 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
483 the form '1234567890abcdefghijklmnopqrstuv+123').
485 Note that this directory will appear empty until you attempt to access a
486 specific collection subdirectory (such as trying to 'cd' into it), at which
487 point the collection will actually be looked up on the server and the directory
488 will appear if it exists.
491 def __init__(self, parent_inode, inodes, api, num_retries):
492 super(MagicDirectory, self).__init__(parent_inode, inodes)
494 self.num_retries = num_retries
496 def __setattr__(self, name, value):
497 super(MagicDirectory, self).__setattr__(name, value)
498 # When we're assigned an inode, add a README.
499 if ((name == 'inode') and (self.inode is not None) and
500 (not self._entries)):
501 self._entries['README'] = self.inodes.add_entry(
502 StringFile(self.inode, self.README_TEXT, time.time()))
503 # If we're the root directory, add an identical by_id subdirectory.
504 if self.inode == llfuse.ROOT_INODE:
505 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
506 self.inode, self.inodes, self.api, self.num_retries))
508 def __contains__(self, k):
509 if k in self._entries:
512 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
516 e = self.inodes.add_entry(CollectionDirectory(
517 self.inode, self.inodes, self.api, self.num_retries, k))
524 except Exception as e:
525 _logger.debug('arv-mount exception keep %s', e)
528 def __getitem__(self, item):
530 return self._entries[item]
532 raise KeyError("No collection with id " + item)
534 def clear(self, force=False):
538 class RecursiveInvalidateDirectory(Directory):
539 def invalidate(self):
541 super(RecursiveInvalidateDirectory, self).invalidate()
542 for a in self._entries:
543 self._entries[a].invalidate()
548 class TagsDirectory(RecursiveInvalidateDirectory):
549 """A special directory that contains as subdirectories all tags visible to the user."""
551 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
552 super(TagsDirectory, self).__init__(parent_inode, inodes)
554 self.num_retries = num_retries
556 self._poll_time = poll_time
560 with llfuse.lock_released:
561 tags = self.api.links().list(
562 filters=[['link_class', '=', 'tag']],
563 select=['name'], distinct=True
564 ).execute(num_retries=self.num_retries)
566 self.merge(tags['items'],
568 lambda a, i: a.tag == i['name'],
569 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
572 class TagDirectory(Directory):
573 """A special directory that contains as subdirectories all collections visible
574 to the user that are tagged with a particular tag.
577 def __init__(self, parent_inode, inodes, api, num_retries, tag,
578 poll=False, poll_time=60):
579 super(TagDirectory, self).__init__(parent_inode, inodes)
581 self.num_retries = num_retries
584 self._poll_time = poll_time
588 with llfuse.lock_released:
589 taggedcollections = self.api.links().list(
590 filters=[['link_class', '=', 'tag'],
591 ['name', '=', self.tag],
592 ['head_uuid', 'is_a', 'arvados#collection']],
594 ).execute(num_retries=self.num_retries)
595 self.merge(taggedcollections['items'],
596 lambda i: i['head_uuid'],
597 lambda a, i: a.collection_locator == i['head_uuid'],
598 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
601 class ProjectDirectory(Directory):
602 """A special directory that contains the contents of a project."""
604 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
605 poll=False, poll_time=60):
606 super(ProjectDirectory, self).__init__(parent_inode, inodes)
608 self.num_retries = num_retries
609 self.project_object = project_object
610 self.project_object_file = None
611 self.project_uuid = project_object['uuid']
613 self._poll_time = poll_time
614 self._updating_lock = threading.Lock()
615 self._current_user = None
617 def createDirectory(self, i):
618 if collection_uuid_pattern.match(i['uuid']):
619 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
620 elif group_uuid_pattern.match(i['uuid']):
621 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
622 elif link_uuid_pattern.match(i['uuid']):
623 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
624 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
627 elif uuid_pattern.match(i['uuid']):
628 return ObjectFile(self.parent_inode, i)
633 return self.project_uuid
637 if self.project_object_file == None:
638 self.project_object_file = ObjectFile(self.inode, self.project_object)
639 self.inodes.add_entry(self.project_object_file)
643 if i['name'] is None or len(i['name']) == 0:
645 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
646 # collection or subproject
648 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
651 elif 'kind' in i and i['kind'].startswith('arvados#'):
653 return "{}.{}".format(i['name'], i['kind'][8:])
658 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
659 return a.uuid() == i['uuid']
660 elif isinstance(a, ObjectFile):
661 return a.uuid() == i['uuid'] and not a.stale()
665 with llfuse.lock_released:
666 self._updating_lock.acquire()
670 if group_uuid_pattern.match(self.project_uuid):
671 self.project_object = self.api.groups().get(
672 uuid=self.project_uuid).execute(num_retries=self.num_retries)
673 elif user_uuid_pattern.match(self.project_uuid):
674 self.project_object = self.api.users().get(
675 uuid=self.project_uuid).execute(num_retries=self.num_retries)
677 contents = arvados.util.list_all(self.api.groups().contents,
678 self.num_retries, uuid=self.project_uuid)
680 # end with llfuse.lock_released, re-acquire lock
685 self.createDirectory)
687 self._updating_lock.release()
691 def __getitem__(self, item):
692 if item == '.arvados#project':
693 return self.project_object_file
695 return super(ProjectDirectory, self).__getitem__(item)
697 def __contains__(self, k):
698 if k == '.arvados#project':
701 return super(ProjectDirectory, self).__contains__(k)
706 with llfuse.lock_released:
707 if not self._current_user:
708 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
709 return self._current_user["uuid"] in self.project_object["writable_by"]
716 def mkdir(self, name):
718 with llfuse.lock_released:
719 self.api.collections().create(body={"owner_uuid": self.project_uuid,
721 "manifest_text": ""}).execute(num_retries=self.num_retries)
723 except apiclient_errors.Error as error:
725 raise llfuse.FUSEError(errno.EEXIST)
729 def rmdir(self, name):
731 raise llfuse.FUSEError(errno.ENOENT)
732 if not isinstance(self[name], CollectionDirectory):
733 raise llfuse.FUSEError(errno.EPERM)
734 if len(self[name]) > 0:
735 raise llfuse.FUSEError(errno.ENOTEMPTY)
736 with llfuse.lock_released:
737 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
742 def rename(self, name_old, name_new, src):
743 if not isinstance(src, ProjectDirectory):
744 raise llfuse.FUSEError(errno.EPERM)
748 if not isinstance(ent, CollectionDirectory):
749 raise llfuse.FUSEError(errno.EPERM)
752 # POSIX semantics for replacing one directory with another is
753 # tricky (the target directory must be empty, the operation must be
754 # atomic which isn't possible with the Arvados API as of this
755 # writing) so don't support that.
756 raise llfuse.FUSEError(errno.EPERM)
758 self.api.collections().update(uuid=ent.uuid(),
759 body={"owner_uuid": self.uuid(),
760 "name": name_new}).execute(num_retries=self.num_retries)
762 # Acually move the entry from source directory to this directory.
763 del src._entries[name_old]
764 self._entries[name_new] = ent
765 llfuse.invalidate_entry(src.inode, name_old)
768 class SharedDirectory(Directory):
769 """A special directory that represents users or groups who have shared projects with me."""
771 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
772 poll=False, poll_time=60):
773 super(SharedDirectory, self).__init__(parent_inode, inodes)
775 self.num_retries = num_retries
776 self.current_user = api.users().current().execute(num_retries=num_retries)
778 self._poll_time = poll_time
782 with llfuse.lock_released:
783 all_projects = arvados.util.list_all(
784 self.api.groups().list, self.num_retries,
785 filters=[['group_class','=','project']])
787 for ob in all_projects:
788 objects[ob['uuid']] = ob
792 for ob in all_projects:
793 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
795 root_owners[ob['owner_uuid']] = True
797 lusers = arvados.util.list_all(
798 self.api.users().list, self.num_retries,
799 filters=[['uuid','in', list(root_owners)]])
800 lgroups = arvados.util.list_all(
801 self.api.groups().list, self.num_retries,
802 filters=[['uuid','in', list(root_owners)]])
808 objects[l["uuid"]] = l
810 objects[l["uuid"]] = l
813 for r in root_owners:
817 contents[obr["name"]] = obr
818 #elif obr.get("username"):
819 # contents[obr["username"]] = obr
820 elif "first_name" in obr:
821 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
825 if r['owner_uuid'] not in objects:
826 contents[r['name']] = r
828 # end with llfuse.lock_released, re-acquire lock
831 self.merge(contents.items(),
833 lambda a, i: a.uuid() == i[1]['uuid'],
834 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))