10 from fusefile import StringFile, ObjectFile, FuseArvadosFile
11 from fresh import FreshBase, convertTime, use_counter
13 import arvados.collection
14 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
16 _logger = logging.getLogger('arvados.arvados_fuse')
19 # Match any character which FUSE or Linux cannot accommodate as part
20 # of a filename. (If present in a collection filename, they will
21 # appear as underscores in the fuse mount.)
22 _disallowed_filename_characters = re.compile('[\x00/]')
24 def sanitize_filename(dirty):
25 """Replace disallowed filename characters with harmless "_"."""
35 return _disallowed_filename_characters.sub('_', dirty)
38 class Directory(FreshBase):
39 """Generic directory object, backed by a dict.
41 Consists of a set of entries with the key representing the filename
42 and the value referencing a File or Directory object.
45 def __init__(self, parent_inode, inodes):
46 super(Directory, self).__init__()
48 """parent_inode is the integer inode number"""
50 if not isinstance(parent_inode, int):
51 raise Exception("parent_inode should be an int")
52 self.parent_inode = parent_inode
55 self._mtime = time.time()
57 # Overriden by subclasses to implement logic to update the entries dict
58 # when the directory is stale
63 # Only used when computing the size of the disk footprint of the directory
71 def checkupdate(self):
75 except apiclient.errors.HttpError as e:
79 def __getitem__(self, item):
81 return self._entries[item]
86 return list(self._entries.items())
89 def __contains__(self, k):
91 return k in self._entries
94 self.inodes.touch(self)
95 super(Directory, self).fresh()
97 def merge(self, items, fn, same, new_entry):
98 """Helper method for updating the contents of the directory.
100 Takes a list describing the new contents of the directory, reuse
101 entries that are the same in both the old and new lists, create new
102 entries, and delete old entries missing from the new list.
104 :items: iterable with new directory contents
106 :fn: function to take an entry in 'items' and return the desired file or
107 directory name, or None if this entry should be skipped
109 :same: function to compare an existing entry (a File or Directory
110 object) with an entry in the items list to determine whether to keep
113 :new_entry: function to create a new directory entry (File or Directory
114 object) from an entry in the items list.
118 oldentries = self._entries
122 name = sanitize_filename(fn(i))
124 if name in oldentries and same(oldentries[name], i):
125 # move existing directory entry over
126 self._entries[name] = oldentries[name]
129 _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
130 # create new directory entry
133 self._entries[name] = self.inodes.add_entry(ent)
136 # delete any other directory entries that were not in found in 'items'
138 _logger.debug("Forgetting about entry '%s' on inode %i", str(i), self.inode)
139 llfuse.invalidate_entry(self.inode, str(i))
140 self.inodes.del_entry(oldentries[i])
144 llfuse.invalidate_inode(self.inode)
145 self._mtime = time.time()
149 def clear(self, force=False):
150 """Delete all entries"""
152 if not self.in_use() or force:
153 oldentries = self._entries
156 if not oldentries[n].clear(force):
157 self._entries = oldentries
160 llfuse.invalidate_entry(self.inode, str(n))
161 self.inodes.del_entry(oldentries[n])
162 llfuse.invalidate_inode(self.inode)
178 raise NotImplementedError()
181 raise NotImplementedError()
184 raise NotImplementedError()
187 raise NotImplementedError()
190 raise NotImplementedError()
192 class CollectionDirectoryBase(Directory):
193 def __init__(self, parent_inode, inodes, collection):
194 super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
195 self.collection = collection
197 def new_entry(self, name, item, mtime):
198 name = sanitize_filename(name)
199 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
200 if item.fuse_entry.dead is not True:
201 raise Exception("Can only reparent dead inode entry")
202 if item.fuse_entry.inode is None:
203 raise Exception("Reparented entry must still have valid inode")
204 item.fuse_entry.dead = False
205 self._entries[name] = item.fuse_entry
206 elif isinstance(item, arvados.collection.RichCollectionBase):
207 self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
208 self._entries[name].populate(mtime)
210 self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
211 item.fuse_entry = self._entries[name]
213 def on_event(self, event, collection, name, item):
214 if collection == self.collection:
215 _logger.debug("%s %s %s %s", event, collection, name, item)
217 if event == arvados.collection.ADD:
218 self.new_entry(name, item, self.mtime())
219 elif event == arvados.collection.DEL:
220 ent = self._entries[name]
221 del self._entries[name]
222 llfuse.invalidate_entry(self.inode, name)
223 self.inodes.del_entry(ent)
224 elif event == arvados.collection.MOD:
225 if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
226 llfuse.invalidate_inode(item.fuse_entry.inode)
227 elif name in self._entries:
228 llfuse.invalidate_inode(self._entries[name].inode)
230 def populate(self, mtime):
232 self.collection.subscribe(self.on_event)
233 for entry, item in self.collection.items():
234 self.new_entry(entry, item, self.mtime())
237 return self.collection.writable()
240 self.collection.root_collection().save()
242 def create(self, name):
243 self.collection.open(name, "w").close()
245 def mkdir(self, name):
246 self.collection.mkdirs(name)
248 def unlink(self, name):
249 self.collection.remove(name)
251 def rmdir(self, name):
252 self.collection.remove(name)
254 def rename(self, name_old, name_new, src):
255 if not isinstance(src, CollectionDirectoryBase):
256 raise llfuse.FUSEError(errno.EPERM)
257 self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
262 class CollectionDirectory(CollectionDirectoryBase):
263 """Represents the root of a directory tree holding a collection."""
265 def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
266 super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
268 self.num_retries = num_retries
269 self.collection_record_file = None
270 self.collection_record = None
271 if isinstance(collection_record, dict):
272 self.collection_locator = collection_record['uuid']
273 self._mtime = convertTime(collection_record.get('modified_at'))
275 self.collection_locator = collection_record
277 self._manifest_size = 0
278 if self.collection_locator:
279 self._writable = (uuid_pattern.match(self.collection_locator) is not None)
280 self._updating_lock = threading.Lock()
283 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
286 return self.collection.writable() if self.collection is not None else self._writable
288 # Used by arv-web.py to switch the contents of the CollectionDirectory
289 def change_collection(self, new_locator):
290 """Switch the contents of the CollectionDirectory.
292 Must be called with llfuse.lock held.
295 self.collection_locator = new_locator
296 self.collection_record = None
299 def new_collection(self, new_collection_record, coll_reader):
301 self.clear(force=True)
303 self.collection_record = new_collection_record
305 if self.collection_record:
306 self._mtime = convertTime(self.collection_record.get('modified_at'))
307 self.collection_locator = self.collection_record["uuid"]
308 if self.collection_record_file is not None:
309 self.collection_record_file.update(self.collection_record)
311 self.collection = coll_reader
312 self.populate(self.mtime())
315 return self.collection_locator
319 if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
322 if self.collection_locator is None:
327 with llfuse.lock_released:
328 self._updating_lock.acquire()
332 _logger.debug("Updating %s", self.collection_locator)
334 self.collection.update()
336 if uuid_pattern.match(self.collection_locator):
337 coll_reader = arvados.collection.Collection(
338 self.collection_locator, self.api, self.api.keep,
339 num_retries=self.num_retries)
341 coll_reader = arvados.collection.CollectionReader(
342 self.collection_locator, self.api, self.api.keep,
343 num_retries=self.num_retries)
344 new_collection_record = coll_reader.api_response() or {}
345 # If the Collection only exists in Keep, there will be no API
346 # response. Fill in the fields we need.
347 if 'uuid' not in new_collection_record:
348 new_collection_record['uuid'] = self.collection_locator
349 if "portable_data_hash" not in new_collection_record:
350 new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
351 if 'manifest_text' not in new_collection_record:
352 new_collection_record['manifest_text'] = coll_reader.manifest_text()
354 if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
355 self.new_collection(new_collection_record, coll_reader)
357 self._manifest_size = len(coll_reader.manifest_text())
358 _logger.debug("%s manifest_size %i", self, self._manifest_size)
359 # end with llfuse.lock_released, re-acquire lock
364 self._updating_lock.release()
365 except arvados.errors.NotFoundError:
366 _logger.exception("arv-mount %s: error", self.collection_locator)
367 except arvados.errors.ArgumentError as detail:
368 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
369 if self.collection_record is not None and "manifest_text" in self.collection_record:
370 _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
372 _logger.exception("arv-mount %s: error", self.collection_locator)
373 if self.collection_record is not None and "manifest_text" in self.collection_record:
374 _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
377 def __getitem__(self, item):
379 if item == '.arvados#collection':
380 if self.collection_record_file is None:
381 self.collection_record_file = ObjectFile(self.inode, self.collection_record)
382 self.inodes.add_entry(self.collection_record_file)
383 return self.collection_record_file
385 return super(CollectionDirectory, self).__getitem__(item)
387 def __contains__(self, k):
388 if k == '.arvados#collection':
391 return super(CollectionDirectory, self).__contains__(k)
393 def invalidate(self):
394 self.collection_record = None
395 self.collection_record_file = None
396 super(CollectionDirectory, self).invalidate()
399 return (self.collection_locator is not None)
402 # This is an empirically-derived heuristic to estimate the memory used
403 # to store this collection's metadata. Calculating the memory
404 # footprint directly would be more accurate, but also more complicated.
405 return self._manifest_size * 128
407 class MagicDirectory(Directory):
408 """A special directory that logically contains the set of all extant keep locators.
410 When a file is referenced by lookup(), it is tested to see if it is a valid
411 keep locator to a manifest, and if so, loads the manifest contents as a
412 subdirectory of this directory with the locator as the directory name.
413 Since querying a list of all extant keep locators is impractical, only
414 collections that have already been accessed are visible to readdir().
419 This directory provides access to Arvados collections as subdirectories listed
420 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
421 the form '1234567890abcdefghijklmnopqrstuv+123').
423 Note that this directory will appear empty until you attempt to access a
424 specific collection subdirectory (such as trying to 'cd' into it), at which
425 point the collection will actually be looked up on the server and the directory
426 will appear if it exists.
429 def __init__(self, parent_inode, inodes, api, num_retries):
430 super(MagicDirectory, self).__init__(parent_inode, inodes)
432 self.num_retries = num_retries
434 def __setattr__(self, name, value):
435 super(MagicDirectory, self).__setattr__(name, value)
436 # When we're assigned an inode, add a README.
437 if ((name == 'inode') and (self.inode is not None) and
438 (not self._entries)):
439 self._entries['README'] = self.inodes.add_entry(
440 StringFile(self.inode, self.README_TEXT, time.time()))
441 # If we're the root directory, add an identical by_id subdirectory.
442 if self.inode == llfuse.ROOT_INODE:
443 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
444 self.inode, self.inodes, self.api, self.num_retries))
446 def __contains__(self, k):
447 if k in self._entries:
450 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
454 e = self.inodes.add_entry(CollectionDirectory(
455 self.inode, self.inodes, self.api, self.num_retries, k))
462 except Exception as e:
463 _logger.debug('arv-mount exception keep %s', e)
466 def __getitem__(self, item):
468 return self._entries[item]
470 raise KeyError("No collection with id " + item)
472 def clear(self, force=False):
476 class RecursiveInvalidateDirectory(Directory):
477 def invalidate(self):
479 super(RecursiveInvalidateDirectory, self).invalidate()
480 for a in self._entries:
481 self._entries[a].invalidate()
486 class TagsDirectory(RecursiveInvalidateDirectory):
487 """A special directory that contains as subdirectories all tags visible to the user."""
489 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
490 super(TagsDirectory, self).__init__(parent_inode, inodes)
492 self.num_retries = num_retries
494 self._poll_time = poll_time
497 with llfuse.lock_released:
498 tags = self.api.links().list(
499 filters=[['link_class', '=', 'tag']],
500 select=['name'], distinct=True
501 ).execute(num_retries=self.num_retries)
503 self.merge(tags['items'],
505 lambda a, i: a.tag == i['name'],
506 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
509 class TagDirectory(Directory):
510 """A special directory that contains as subdirectories all collections visible
511 to the user that are tagged with a particular tag.
514 def __init__(self, parent_inode, inodes, api, num_retries, tag,
515 poll=False, poll_time=60):
516 super(TagDirectory, self).__init__(parent_inode, inodes)
518 self.num_retries = num_retries
521 self._poll_time = poll_time
524 with llfuse.lock_released:
525 taggedcollections = self.api.links().list(
526 filters=[['link_class', '=', 'tag'],
527 ['name', '=', self.tag],
528 ['head_uuid', 'is_a', 'arvados#collection']],
530 ).execute(num_retries=self.num_retries)
531 self.merge(taggedcollections['items'],
532 lambda i: i['head_uuid'],
533 lambda a, i: a.collection_locator == i['head_uuid'],
534 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
537 class ProjectDirectory(Directory):
538 """A special directory that contains the contents of a project."""
540 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
541 poll=False, poll_time=60):
542 super(ProjectDirectory, self).__init__(parent_inode, inodes)
544 self.num_retries = num_retries
545 self.project_object = project_object
546 self.project_object_file = None
547 self.project_uuid = project_object['uuid']
549 self._poll_time = poll_time
550 self._updating_lock = threading.Lock()
552 def createDirectory(self, i):
553 if collection_uuid_pattern.match(i['uuid']):
554 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
555 elif group_uuid_pattern.match(i['uuid']):
556 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
557 elif link_uuid_pattern.match(i['uuid']):
558 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
559 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
562 elif uuid_pattern.match(i['uuid']):
563 return ObjectFile(self.parent_inode, i)
568 return self.project_uuid
571 if self.project_object_file == None:
572 self.project_object_file = ObjectFile(self.inode, self.project_object)
573 self.inodes.add_entry(self.project_object_file)
577 if i['name'] is None or len(i['name']) == 0:
579 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
580 # collection or subproject
582 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
585 elif 'kind' in i and i['kind'].startswith('arvados#'):
587 return "{}.{}".format(i['name'], i['kind'][8:])
592 if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
593 return a.uuid() == i['uuid']
594 elif isinstance(a, ObjectFile):
595 return a.uuid() == i['uuid'] and not a.stale()
599 with llfuse.lock_released:
600 self._updating_lock.acquire()
604 if group_uuid_pattern.match(self.project_uuid):
605 self.project_object = self.api.groups().get(
606 uuid=self.project_uuid).execute(num_retries=self.num_retries)
607 elif user_uuid_pattern.match(self.project_uuid):
608 self.project_object = self.api.users().get(
609 uuid=self.project_uuid).execute(num_retries=self.num_retries)
611 contents = arvados.util.list_all(self.api.groups().contents,
612 self.num_retries, uuid=self.project_uuid)
614 # end with llfuse.lock_released, re-acquire lock
619 self.createDirectory)
621 self._updating_lock.release()
623 def __getitem__(self, item):
625 if item == '.arvados#project':
626 return self.project_object_file
628 return super(ProjectDirectory, self).__getitem__(item)
630 def __contains__(self, k):
631 if k == '.arvados#project':
634 return super(ProjectDirectory, self).__contains__(k)
640 class SharedDirectory(Directory):
641 """A special directory that represents users or groups who have shared projects with me."""
643 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
644 poll=False, poll_time=60):
645 super(SharedDirectory, self).__init__(parent_inode, inodes)
647 self.num_retries = num_retries
648 self.current_user = api.users().current().execute(num_retries=num_retries)
650 self._poll_time = poll_time
653 with llfuse.lock_released:
654 all_projects = arvados.util.list_all(
655 self.api.groups().list, self.num_retries,
656 filters=[['group_class','=','project']])
658 for ob in all_projects:
659 objects[ob['uuid']] = ob
663 for ob in all_projects:
664 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
666 root_owners[ob['owner_uuid']] = True
668 lusers = arvados.util.list_all(
669 self.api.users().list, self.num_retries,
670 filters=[['uuid','in', list(root_owners)]])
671 lgroups = arvados.util.list_all(
672 self.api.groups().list, self.num_retries,
673 filters=[['uuid','in', list(root_owners)]])
679 objects[l["uuid"]] = l
681 objects[l["uuid"]] = l
684 for r in root_owners:
688 contents[obr["name"]] = obr
689 #elif obr.get("username"):
690 # contents[obr["username"]] = obr
691 elif "first_name" in obr:
692 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
696 if r['owner_uuid'] not in objects:
697 contents[r['name']] = r
699 # end with llfuse.lock_released, re-acquire lock
702 self.merge(contents.items(),
704 lambda a, i: a.uuid() == i[1]['uuid'],
705 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))