9 from apiclient import errors as apiclient_errors
12 from fusefile import StringFile, ObjectFile, FuseArvadosFile
13 from fresh import FreshBase, convertTime, use_counter
15 import arvados.collection
16 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
18 _logger = logging.getLogger('arvados.arvados_fuse')
21 # Match any character which FUSE or Linux cannot accommodate as part
22 # of a filename. (If present in a collection filename, they will
23 # appear as underscores in the fuse mount.)
24 _disallowed_filename_characters = re.compile('[\x00/]')
26 def sanitize_filename(dirty):
27 """Replace disallowed filename characters with harmless "_"."""
37 return _disallowed_filename_characters.sub('_', dirty)
40 class Directory(FreshBase):
41 """Generic directory object, backed by a dict.
43 Consists of a set of entries with the key representing the filename
44 and the value referencing a File or Directory object.
47 def __init__(self, parent_inode, inodes):
48 super(Directory, self).__init__()
50 """parent_inode is the integer inode number"""
52 if not isinstance(parent_inode, int):
53 raise Exception("parent_inode should be an int")
54 self.parent_inode = parent_inode
57 self._mtime = time.time()
59 # Overriden by subclasses to implement logic to update the entries dict
60 # when the directory is stale
65 # Only used when computing the size of the disk footprint of the directory
73 def checkupdate(self):
77 except apiclient.errors.HttpError as e:
81 def __getitem__(self, item):
83 return self._entries[item]
88 return list(self._entries.items())
91 def __contains__(self, k):
93 return k in self._entries
98 return len(self._entries)
101 self.inodes.touch(self)
102 super(Directory, self).fresh()
104 def merge(self, items, fn, same, new_entry):
105 """Helper method for updating the contents of the directory.
107 Takes a list describing the new contents of the directory, reuse
108 entries that are the same in both the old and new lists, create new
109 entries, and delete old entries missing from the new list.
111 :items: iterable with new directory contents
113 :fn: function to take an entry in 'items' and return the desired file or
114 directory name, or None if this entry should be skipped
116 :same: function to compare an existing entry (a File or Directory
117 object) with an entry in the items list to determine whether to keep
120 :new_entry: function to create a new directory entry (File or Directory
121 object) from an entry in the items list.
125 oldentries = self._entries
129 name = sanitize_filename(fn(i))
131 if name in oldentries and same(oldentries[name], i):
132 # move existing directory entry over
133 self._entries[name] = oldentries[name]
136 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
137 # create new directory entry
140 self._entries[name] = self.inodes.add_entry(ent)
143 # delete any other directory entries that were not in found in 'items'
145 _logger.debug("Forgetting about entry '%s' on inode %i", str(i), self.inode)
146 llfuse.invalidate_entry(self.inode, str(i))
147 self.inodes.del_entry(oldentries[i])
151 llfuse.invalidate_inode(self.inode)
152 self._mtime = time.time()
156 def clear(self, force=False):
157 """Delete all entries"""
159 if not self.in_use() or force:
160 oldentries = self._entries
163 if not oldentries[n].clear(force):
164 self._entries = oldentries
167 llfuse.invalidate_entry(self.inode, str(n))
168 self.inodes.del_entry(oldentries[n])
169 llfuse.invalidate_inode(self.inode)
184 def create(self, name):
185 raise NotImplementedError()
187 def mkdir(self, name):
188 raise NotImplementedError()
190 def unlink(self, name):
191 raise NotImplementedError()
193 def rmdir(self, name):
194 raise NotImplementedError()
196 def rename(self, name_old, name_new, src):
197 raise NotImplementedError()
199 class CollectionDirectoryBase(Directory):
200 def __init__(self, parent_inode, inodes, collection):
201 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
202 self.collection = collection
204 def new_entry(self, name, item, mtime):
205 name = sanitize_filename(name)
206 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
207 if item.fuse_entry.dead is not True:
208 raise Exception("Can only reparent dead inode entry")
209 if item.fuse_entry.inode is None:
210 raise Exception("Reparented entry must still have valid inode")
211 item.fuse_entry.dead = False
212 self._entries[name] = item.fuse_entry
213 elif isinstance(item, arvados.collection.RichCollectionBase):
214 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
215 self._entries[name].populate(mtime)
217 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
218 item.fuse_entry = self._entries[name]
220 def on_event(self, event, collection, name, item):
221 if collection == self.collection:
222 _logger.debug("%s %s %s %s", event, collection, name, item)
224 if event == arvados.collection.ADD:
225 self.new_entry(name, item, self.mtime())
226 elif event == arvados.collection.DEL:
227 ent = self._entries[name]
228 del self._entries[name]
229 llfuse.invalidate_entry(self.inode, name)
230 self.inodes.del_entry(ent)
231 elif event == arvados.collection.MOD:
232 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
233 llfuse.invalidate_inode(item.fuse_entry.inode)
234 elif name in self._entries:
235 llfuse.invalidate_inode(self._entries[name].inode)
237 def populate(self, mtime):
239 self.collection.subscribe(self.on_event)
240 for entry, item in self.collection.items():
241 self.new_entry(entry, item, self.mtime())
244 return self.collection.writable()
247 with llfuse.lock_released:
248 self.collection.root_collection().save()
250 def create(self, name):
251 with llfuse.lock_released:
252 self.collection.open(name, "w").close()
254 def mkdir(self, name):
255 with llfuse.lock_released:
256 self.collection.mkdirs(name)
258 def unlink(self, name):
259 with llfuse.lock_released:
260 self.collection.remove(name)
263 def rmdir(self, name):
264 with llfuse.lock_released:
265 self.collection.remove(name)
268 def rename(self, name_old, name_new, src):
269 if not isinstance(src, CollectionDirectoryBase):
270 raise llfuse.FUSEError(errno.EPERM)
275 if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
277 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
279 raise llfuse.FUSEError(errno.ENOTEMPTY)
280 elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
281 raise llfuse.FUSEError(errno.ENOTDIR)
282 elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
283 raise llfuse.FUSEError(errno.EISDIR)
285 with llfuse.lock_released:
286 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
291 class CollectionDirectory(CollectionDirectoryBase):
292 """Represents the root of a directory tree holding a collection."""
294 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
295 super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
297 self.num_retries = num_retries
298 self.collection_record_file = None
299 self.collection_record = None
300 if isinstance(collection_record, dict):
301 self.collection_locator = collection_record['uuid']
302 self._mtime = convertTime(collection_record.get('modified_at'))
304 self.collection_locator = collection_record
306 self._manifest_size = 0
307 if self.collection_locator:
308 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
309 self._updating_lock = threading.Lock()
312 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
315 return self.collection.writable() if self.collection is not None else self._writable
317 # Used by arv-web.py to switch the contents of the CollectionDirectory
318 def change_collection(self, new_locator):
319 """Switch the contents of the CollectionDirectory.
321 Must be called with llfuse.lock held.
324 self.collection_locator = new_locator
325 self.collection_record = None
328 def new_collection(self, new_collection_record, coll_reader):
330 self.clear(force=True)
332 self.collection_record = new_collection_record
334 if self.collection_record:
335 self._mtime = convertTime(self.collection_record.get('modified_at'))
336 self.collection_locator = self.collection_record["uuid"]
337 if self.collection_record_file is not None:
338 self.collection_record_file.update(self.collection_record)
340 self.collection = coll_reader
341 self.populate(self.mtime())
344 return self.collection_locator
348 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
351 if self.collection_locator is None:
356 with llfuse.lock_released:
357 self._updating_lock.acquire()
361 _logger.debug("Updating %s", self.collection_locator)
363 self.collection.update()
365 if uuid_pattern.match(self.collection_locator):
366 coll_reader = arvados.collection.Collection(
367 self.collection_locator, self.api, self.api.keep,
368 num_retries=self.num_retries)
370 coll_reader = arvados.collection.CollectionReader(
371 self.collection_locator, self.api, self.api.keep,
372 num_retries=self.num_retries)
373 new_collection_record = coll_reader.api_response() or {}
374 # If the Collection only exists in Keep, there will be no API
375 # response. Fill in the fields we need.
376 if 'uuid' not in new_collection_record:
377 new_collection_record['uuid'] = self.collection_locator
378 if "portable_data_hash" not in new_collection_record:
379 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
380 if 'manifest_text' not in new_collection_record:
381 new_collection_record['manifest_text'] = coll_reader.manifest_text()
383 if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
384 self.new_collection(new_collection_record, coll_reader)
386 self._manifest_size = len(coll_reader.manifest_text())
387 _logger.debug("%s manifest_size %i", self, self._manifest_size)
388 # end with llfuse.lock_released, re-acquire lock
393 self._updating_lock.release()
394 except arvados.errors.NotFoundError:
395 _logger.exception("arv-mount %s: error", self.collection_locator)
396 except arvados.errors.ArgumentError as detail:
397 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
398 if self.collection_record is not None and "manifest_text" in self.collection_record:
399 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
401 _logger.exception("arv-mount %s: error", self.collection_locator)
402 if self.collection_record is not None and "manifest_text" in self.collection_record:
403 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
406 def __getitem__(self, item):
408 if item == '.arvados#collection':
409 if self.collection_record_file is None:
410 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
411 self.inodes.add_entry(self.collection_record_file)
412 return self.collection_record_file
414 return super(CollectionDirectory, self).__getitem__(item)
416 def __contains__(self, k):
417 if k == '.arvados#collection':
420 return super(CollectionDirectory, self).__contains__(k)
422 def invalidate(self):
423 self.collection_record = None
424 self.collection_record_file = None
425 super(CollectionDirectory, self).invalidate()
428 return (self.collection_locator is not None)
431 # This is an empirically-derived heuristic to estimate the memory used
432 # to store this collection's metadata. Calculating the memory
433 # footprint directly would be more accurate, but also more complicated.
434 return self._manifest_size * 128
436 class MagicDirectory(Directory):
437 """A special directory that logically contains the set of all extant keep locators.
439 When a file is referenced by lookup(), it is tested to see if it is a valid
440 keep locator to a manifest, and if so, loads the manifest contents as a
441 subdirectory of this directory with the locator as the directory name.
442 Since querying a list of all extant keep locators is impractical, only
443 collections that have already been accessed are visible to readdir().
448 This directory provides access to Arvados collections as subdirectories listed
449 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
450 the form '1234567890abcdefghijklmnopqrstuv+123').
452 Note that this directory will appear empty until you attempt to access a
453 specific collection subdirectory (such as trying to 'cd' into it), at which
454 point the collection will actually be looked up on the server and the directory
455 will appear if it exists.
458 def __init__(self, parent_inode, inodes, api, num_retries):
459 super(MagicDirectory, self).__init__(parent_inode, inodes)
461 self.num_retries = num_retries
463 def __setattr__(self, name, value):
464 super(MagicDirectory, self).__setattr__(name, value)
465 # When we're assigned an inode, add a README.
466 if ((name == 'inode') and (self.inode is not None) and
467 (not self._entries)):
468 self._entries['README'] = self.inodes.add_entry(
469 StringFile(self.inode, self.README_TEXT, time.time()))
470 # If we're the root directory, add an identical by_id subdirectory.
471 if self.inode == llfuse.ROOT_INODE:
472 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
473 self.inode, self.inodes, self.api, self.num_retries))
475 def __contains__(self, k):
476 if k in self._entries:
479 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
483 e = self.inodes.add_entry(CollectionDirectory(
484 self.inode, self.inodes, self.api, self.num_retries, k))
491 except Exception as e:
492 _logger.debug('arv-mount exception keep %s', e)
495 def __getitem__(self, item):
497 return self._entries[item]
499 raise KeyError("No collection with id " + item)
501 def clear(self, force=False):
505 class RecursiveInvalidateDirectory(Directory):
506 def invalidate(self):
508 super(RecursiveInvalidateDirectory, self).invalidate()
509 for a in self._entries:
510 self._entries[a].invalidate()
515 class TagsDirectory(RecursiveInvalidateDirectory):
516 """A special directory that contains as subdirectories all tags visible to the user."""
518 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
519 super(TagsDirectory, self).__init__(parent_inode, inodes)
521 self.num_retries = num_retries
523 self._poll_time = poll_time
526 with llfuse.lock_released:
527 tags = self.api.links().list(
528 filters=[['link_class', '=', 'tag']],
529 select=['name'], distinct=True
530 ).execute(num_retries=self.num_retries)
532 self.merge(tags['items'],
534 lambda a, i: a.tag == i['name'],
535 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
538 class TagDirectory(Directory):
539 """A special directory that contains as subdirectories all collections visible
540 to the user that are tagged with a particular tag.
543 def __init__(self, parent_inode, inodes, api, num_retries, tag,
544 poll=False, poll_time=60):
545 super(TagDirectory, self).__init__(parent_inode, inodes)
547 self.num_retries = num_retries
550 self._poll_time = poll_time
553 with llfuse.lock_released:
554 taggedcollections = self.api.links().list(
555 filters=[['link_class', '=', 'tag'],
556 ['name', '=', self.tag],
557 ['head_uuid', 'is_a', 'arvados#collection']],
559 ).execute(num_retries=self.num_retries)
560 self.merge(taggedcollections['items'],
561 lambda i: i['head_uuid'],
562 lambda a, i: a.collection_locator == i['head_uuid'],
563 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
566 class ProjectDirectory(Directory):
567 """A special directory that contains the contents of a project."""
569 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
570 poll=False, poll_time=60):
571 super(ProjectDirectory, self).__init__(parent_inode, inodes)
573 self.num_retries = num_retries
574 self.project_object = project_object
575 self.project_object_file = None
576 self.project_uuid = project_object['uuid']
578 self._poll_time = poll_time
579 self._updating_lock = threading.Lock()
580 self._current_user = None
582 def createDirectory(self, i):
583 if collection_uuid_pattern.match(i['uuid']):
584 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
585 elif group_uuid_pattern.match(i['uuid']):
586 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
587 elif link_uuid_pattern.match(i['uuid']):
588 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
589 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
592 elif uuid_pattern.match(i['uuid']):
593 return ObjectFile(self.parent_inode, i)
598 return self.project_uuid
601 if self.project_object_file == None:
602 self.project_object_file = ObjectFile(self.inode, self.project_object)
603 self.inodes.add_entry(self.project_object_file)
607 if i['name'] is None or len(i['name']) == 0:
609 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
610 # collection or subproject
612 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
615 elif 'kind' in i and i['kind'].startswith('arvados#'):
617 return "{}.{}".format(i['name'], i['kind'][8:])
622 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
623 return a.uuid() == i['uuid']
624 elif isinstance(a, ObjectFile):
625 return a.uuid() == i['uuid'] and not a.stale()
629 with llfuse.lock_released:
630 self._updating_lock.acquire()
634 if group_uuid_pattern.match(self.project_uuid):
635 self.project_object = self.api.groups().get(
636 uuid=self.project_uuid).execute(num_retries=self.num_retries)
637 elif user_uuid_pattern.match(self.project_uuid):
638 self.project_object = self.api.users().get(
639 uuid=self.project_uuid).execute(num_retries=self.num_retries)
641 contents = arvados.util.list_all(self.api.groups().contents,
642 self.num_retries, uuid=self.project_uuid)
644 # end with llfuse.lock_released, re-acquire lock
649 self.createDirectory)
651 self._updating_lock.release()
653 def __getitem__(self, item):
655 if item == '.arvados#project':
656 return self.project_object_file
658 return super(ProjectDirectory, self).__getitem__(item)
660 def __contains__(self, k):
661 if k == '.arvados#project':
664 return super(ProjectDirectory, self).__contains__(k)
667 with llfuse.lock_released:
668 if not self._current_user:
669 self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
670 return self._current_user["uuid"] in self.project_object["writable_by"]
675 def mkdir(self, name):
677 with llfuse.lock_released:
678 self.api.collections().create(body={"owner_uuid": self.project_uuid,
680 "manifest_text": ""}).execute(num_retries=self.num_retries)
682 except apiclient_errors.Error as error:
684 raise llfuse.FUSEError(errno.EEXIST)
686 def rmdir(self, name):
688 raise llfuse.FUSEError(errno.ENOENT)
689 if not isinstance(self[name], CollectionDirectory):
690 raise llfuse.FUSEError(errno.EPERM)
691 if len(self[name]) > 0:
692 raise llfuse.FUSEError(errno.ENOTEMPTY)
693 with llfuse.lock_released:
694 self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
697 def rename(self, name_old, name_new, src):
698 if not isinstance(src, ProjectDirectory):
699 raise llfuse.FUSEError(errno.EPERM)
703 if not isinstance(ent, CollectionDirectory):
704 raise llfuse.FUSEError(errno.EPERM)
707 # POSIX semantics for replacing one directory with another is
708 # tricky (the target directory must be empty, the operation must be
709 # atomic which isn't possible with the Arvados API as of this
710 # writing) so don't support that.
711 raise llfuse.FUSEError(errno.EPERM)
713 self.api.collections().update(uuid=ent.uuid(),
714 body={"owner_uuid": self.uuid(),
715 "name": name_new}).execute(num_retries=self.num_retries)
717 # Acually move the entry from source directory to this directory.
718 del src._entries[name_old]
719 self._entries[name_new] = ent
720 llfuse.invalidate_entry(src.inode, name_old)
722 class SharedDirectory(Directory):
723 """A special directory that represents users or groups who have shared projects with me."""
725 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
726 poll=False, poll_time=60):
727 super(SharedDirectory, self).__init__(parent_inode, inodes)
729 self.num_retries = num_retries
730 self.current_user = api.users().current().execute(num_retries=num_retries)
732 self._poll_time = poll_time
735 with llfuse.lock_released:
736 all_projects = arvados.util.list_all(
737 self.api.groups().list, self.num_retries,
738 filters=[['group_class','=','project']])
740 for ob in all_projects:
741 objects[ob['uuid']] = ob
745 for ob in all_projects:
746 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
748 root_owners[ob['owner_uuid']] = True
750 lusers = arvados.util.list_all(
751 self.api.users().list, self.num_retries,
752 filters=[['uuid','in', list(root_owners)]])
753 lgroups = arvados.util.list_all(
754 self.api.groups().list, self.num_retries,
755 filters=[['uuid','in', list(root_owners)]])
761 objects[l["uuid"]] = l
763 objects[l["uuid"]] = l
766 for r in root_owners:
770 contents[obr["name"]] = obr
771 #elif obr.get("username"):
772 # contents[obr["username"]] = obr
773 elif "first_name" in obr:
774 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
778 if r['owner_uuid'] not in objects:
779 contents[r['name']] = r
781 # end with llfuse.lock_released, re-acquire lock
784 self.merge(contents.items(),
786 lambda a, i: a.uuid() == i[1]['uuid'],
787 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))