2 # FUSE driver for Arvados Keep
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
26 _logger = logging.getLogger('arvados.arvados_fuse')
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
33 class SafeApi(object):
34 '''Threadsafe wrapper for API object. This stores and returns a different api
35 object per thread, because httplib2 which underlies apiclient is not
39 def __init__(self, config):
40 self.host = config.get('ARVADOS_API_HOST')
41 self.api_token = config.get('ARVADOS_API_TOKEN')
42 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
43 self.local = threading.local()
44 self.block_cache = arvados.KeepBlockCache()
47 if 'api' not in self.local.__dict__:
48 self.local.api = arvados.api('v1', False, self.host,
49 self.api_token, self.insecure)
53 if 'keep' not in self.local.__dict__:
54 self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
55 return self.local.keep
57 def __getattr__(self, name):
58 # Proxy nonexistent attributes to the local API client.
60 return getattr(self.localapi(), name)
61 except AttributeError:
62 return super(SafeApi, self).__getattr__(name)
66 '''Parse Arvados timestamp to unix time.'''
68 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
69 except (TypeError, ValueError):
72 def sanitize_filename(dirty):
73 '''Replace disallowed filename characters with harmless "_".'''
83 return _disallowed_filename_characters.sub('_', dirty)
86 class FreshBase(object):
87 '''Base class for maintaining fresh/stale state to determine when to update.'''
91 self._last_update = time.time()
92 self._atime = time.time()
95 # Mark the value as stale
99 # Test if the entries dict is stale.
104 return (self._last_update + self._poll_time) < self._atime
109 self._last_update = time.time()
114 class File(FreshBase):
115 '''Base for file objects.'''
117 def __init__(self, parent_inode, _mtime=0):
118 super(File, self).__init__()
120 self.parent_inode = parent_inode
126 def readfrom(self, off, size):
133 class StreamReaderFile(File):
134 '''Wraps a StreamFileReader as a file.'''
136 def __init__(self, parent_inode, reader, _mtime):
137 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
141 return self.reader.size()
143 def readfrom(self, off, size):
144 return self.reader.readfrom(off, size)
150 class StringFile(File):
151 '''Wrap a simple string as a file'''
152 def __init__(self, parent_inode, contents, _mtime):
153 super(StringFile, self).__init__(parent_inode, _mtime)
154 self.contents = contents
157 return len(self.contents)
159 def readfrom(self, off, size):
160 return self.contents[off:(off+size)]
163 class ObjectFile(StringFile):
164 '''Wrap a dict as a serialized json object.'''
166 def __init__(self, parent_inode, obj):
167 super(ObjectFile, self).__init__(parent_inode, "", 0)
168 self.uuid = obj['uuid']
171 def update(self, obj):
172 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
173 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
176 class Directory(FreshBase):
177 '''Generic directory object, backed by a dict.
178 Consists of a set of entries with the key representing the filename
179 and the value referencing a File or Directory object.
182 def __init__(self, parent_inode):
183 super(Directory, self).__init__()
185 '''parent_inode is the integer inode number'''
187 if not isinstance(parent_inode, int):
188 raise Exception("parent_inode should be an int")
189 self.parent_inode = parent_inode
191 self._mtime = time.time()
193 # Overriden by subclasses to implement logic to update the entries dict
194 # when the directory is stale
198 # Only used when computing the size of the disk footprint of the directory
203 def checkupdate(self):
207 except apiclient.errors.HttpError as e:
210 def __getitem__(self, item):
212 return self._entries[item]
216 return self._entries.items()
220 return self._entries.iterkeys()
222 def __contains__(self, k):
224 return k in self._entries
226 def merge(self, items, fn, same, new_entry):
227 '''Helper method for updating the contents of the directory. Takes a list
228 describing the new contents of the directory, reuse entries that are
229 the same in both the old and new lists, create new entries, and delete
230 old entries missing from the new list.
232 items: iterable with new directory contents
234 fn: function to take an entry in 'items' and return the desired file or
235 directory name, or None if this entry should be skipped
237 same: function to compare an existing entry (a File or Directory
238 object) with an entry in the items list to determine whether to keep
241 new_entry: function to create a new directory entry (File or Directory
242 object) from an entry in the items list.
246 oldentries = self._entries
250 name = sanitize_filename(fn(i))
252 if name in oldentries and same(oldentries[name], i):
253 # move existing directory entry over
254 self._entries[name] = oldentries[name]
257 # create new directory entry
260 self._entries[name] = self.inodes.add_entry(ent)
263 # delete any other directory entries that were not in found in 'items'
265 llfuse.invalidate_entry(self.inode, str(i))
266 self.inodes.del_entry(oldentries[i])
270 self._mtime = time.time()
275 '''Delete all entries'''
276 oldentries = self._entries
279 if isinstance(n, Directory):
281 llfuse.invalidate_entry(self.inode, str(n))
282 self.inodes.del_entry(oldentries[n])
283 llfuse.invalidate_inode(self.inode)
290 class CollectionDirectory(Directory):
291 '''Represents the root of a directory tree holding a collection.'''
293 def __init__(self, parent_inode, inodes, api, num_retries, collection):
294 super(CollectionDirectory, self).__init__(parent_inode)
297 self.num_retries = num_retries
298 self.collection_object_file = None
299 self.collection_object = None
300 if isinstance(collection, dict):
301 self.collection_locator = collection['uuid']
303 self.collection_locator = collection
306 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
308 def change_collection(self, new_locator):
309 self.collection_locator = new_locator
310 self.collection_object = None
313 def new_collection(self, new_collection_object, coll_reader):
314 self.collection_object = new_collection_object
316 if self.collection_object_file is not None:
317 self.collection_object_file.update(self.collection_object)
320 for s in coll_reader.all_streams():
322 for part in s.name().split('/'):
323 if part != '' and part != '.':
324 partname = sanitize_filename(part)
325 if partname not in cwd._entries:
326 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
327 cwd = cwd._entries[partname]
328 for k, v in s.files().items():
329 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
333 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
336 with llfuse.lock_released:
337 coll_reader = arvados.CollectionReader(
338 self.collection_locator, self.api, self.api.localkeep(),
339 num_retries=self.num_retries)
340 new_collection_object = coll_reader.api_response() or {}
341 # If the Collection only exists in Keep, there will be no API
342 # response. Fill in the fields we need.
343 if 'uuid' not in new_collection_object:
344 new_collection_object['uuid'] = self.collection_locator
345 if "portable_data_hash" not in new_collection_object:
346 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
347 if 'manifest_text' not in new_collection_object:
348 new_collection_object['manifest_text'] = coll_reader.manifest_text()
349 coll_reader.normalize()
350 # end with llfuse.lock_released, re-acquire lock
352 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
353 self.new_collection(new_collection_object, coll_reader)
357 except apiclient.errors.NotFoundError:
358 _logger.exception("arv-mount %s: error", self.collection_locator)
359 except arvados.errors.ArgumentError as detail:
360 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
361 if self.collection_object is not None and "manifest_text" in self.collection_object:
362 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
364 _logger.exception("arv-mount %s: error", self.collection_locator)
365 if self.collection_object is not None and "manifest_text" in self.collection_object:
366 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
369 def __getitem__(self, item):
371 if item == '.arvados#collection':
372 if self.collection_object_file is None:
373 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
374 self.inodes.add_entry(self.collection_object_file)
375 return self.collection_object_file
377 return super(CollectionDirectory, self).__getitem__(item)
379 def __contains__(self, k):
380 if k == '.arvados#collection':
383 return super(CollectionDirectory, self).__contains__(k)
387 return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
390 class MagicDirectory(Directory):
391 '''A special directory that logically contains the set of all extant keep
392 locators. When a file is referenced by lookup(), it is tested to see if it
393 is a valid keep locator to a manifest, and if so, loads the manifest
394 contents as a subdirectory of this directory with the locator as the
395 directory name. Since querying a list of all extant keep locators is
396 impractical, only collections that have already been accessed are visible
401 This directory provides access to Arvados collections as subdirectories listed
402 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
403 the form '1234567890abcdefghijklmnopqrstuv+123').
405 Note that this directory will appear empty until you attempt to access a
406 specific collection subdirectory (such as trying to 'cd' into it), at which
407 point the collection will actually be looked up on the server and the directory
408 will appear if it exists.
411 def __init__(self, parent_inode, inodes, api, num_retries):
412 super(MagicDirectory, self).__init__(parent_inode)
415 self.num_retries = num_retries
417 def __setattr__(self, name, value):
418 super(MagicDirectory, self).__setattr__(name, value)
419 # When we're assigned an inode, add a README.
420 if ((name == 'inode') and (self.inode is not None) and
421 (not self._entries)):
422 self._entries['README'] = self.inodes.add_entry(
423 StringFile(self.inode, self.README_TEXT, time.time()))
424 # If we're the root directory, add an identical by_id subdirectory.
425 if self.inode == llfuse.ROOT_INODE:
426 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
427 self.inode, self.inodes, self.api, self.num_retries))
429 def __contains__(self, k):
430 if k in self._entries:
433 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
437 e = self.inodes.add_entry(CollectionDirectory(
438 self.inode, self.inodes, self.api, self.num_retries, k))
444 except Exception as e:
445 _logger.debug('arv-mount exception keep %s', e)
448 def __getitem__(self, item):
450 return self._entries[item]
452 raise KeyError("No collection with id " + item)
455 class RecursiveInvalidateDirectory(Directory):
456 def invalidate(self):
457 if self.inode == llfuse.ROOT_INODE:
458 llfuse.lock.acquire()
460 super(RecursiveInvalidateDirectory, self).invalidate()
461 for a in self._entries:
462 self._entries[a].invalidate()
466 if self.inode == llfuse.ROOT_INODE:
467 llfuse.lock.release()
470 class TagsDirectory(RecursiveInvalidateDirectory):
471 '''A special directory that contains as subdirectories all tags visible to the user.'''
473 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
474 super(TagsDirectory, self).__init__(parent_inode)
477 self.num_retries = num_retries
479 self._poll_time = poll_time
482 with llfuse.lock_released:
483 tags = self.api.links().list(
484 filters=[['link_class', '=', 'tag']],
485 select=['name'], distinct=True
486 ).execute(num_retries=self.num_retries)
488 self.merge(tags['items'],
489 lambda i: i['name'] if 'name' in i else i['uuid'],
490 lambda a, i: a.tag == i,
491 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
494 class TagDirectory(Directory):
495 '''A special directory that contains as subdirectories all collections visible
496 to the user that are tagged with a particular tag.
499 def __init__(self, parent_inode, inodes, api, num_retries, tag,
500 poll=False, poll_time=60):
501 super(TagDirectory, self).__init__(parent_inode)
504 self.num_retries = num_retries
507 self._poll_time = poll_time
510 with llfuse.lock_released:
511 taggedcollections = self.api.links().list(
512 filters=[['link_class', '=', 'tag'],
513 ['name', '=', self.tag],
514 ['head_uuid', 'is_a', 'arvados#collection']],
516 ).execute(num_retries=self.num_retries)
517 self.merge(taggedcollections['items'],
518 lambda i: i['head_uuid'],
519 lambda a, i: a.collection_locator == i['head_uuid'],
520 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
523 class ProjectDirectory(Directory):
524 '''A special directory that contains the contents of a project.'''
526 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
527 poll=False, poll_time=60):
528 super(ProjectDirectory, self).__init__(parent_inode)
531 self.num_retries = num_retries
532 self.project_object = project_object
533 self.project_object_file = None
534 self.uuid = project_object['uuid']
536 self._poll_time = poll_time
538 def createDirectory(self, i):
539 if collection_uuid_pattern.match(i['uuid']):
540 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
541 elif group_uuid_pattern.match(i['uuid']):
542 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
543 elif link_uuid_pattern.match(i['uuid']):
544 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
545 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
548 elif uuid_pattern.match(i['uuid']):
549 return ObjectFile(self.parent_inode, i)
554 if self.project_object_file == None:
555 self.project_object_file = ObjectFile(self.inode, self.project_object)
556 self.inodes.add_entry(self.project_object_file)
560 if i['name'] is None or len(i['name']) == 0:
562 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
563 # collection or subproject
565 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
568 elif 'kind' in i and i['kind'].startswith('arvados#'):
570 return "{}.{}".format(i['name'], i['kind'][8:])
575 if isinstance(a, CollectionDirectory):
576 return a.collection_locator == i['uuid']
577 elif isinstance(a, ProjectDirectory):
578 return a.uuid == i['uuid']
579 elif isinstance(a, ObjectFile):
580 return a.uuid == i['uuid'] and not a.stale()
583 with llfuse.lock_released:
584 if group_uuid_pattern.match(self.uuid):
585 self.project_object = self.api.groups().get(
586 uuid=self.uuid).execute(num_retries=self.num_retries)
587 elif user_uuid_pattern.match(self.uuid):
588 self.project_object = self.api.users().get(
589 uuid=self.uuid).execute(num_retries=self.num_retries)
591 contents = arvados.util.list_all(self.api.groups().contents,
592 self.num_retries, uuid=self.uuid)
593 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
594 contents += arvados.util.list_all(
595 self.api.links().list, self.num_retries,
596 filters=[['tail_uuid', '=', self.uuid],
597 ['link_class', '=', 'name']])
599 # end with llfuse.lock_released, re-acquire lock
604 self.createDirectory)
606 def __getitem__(self, item):
608 if item == '.arvados#project':
609 return self.project_object_file
611 return super(ProjectDirectory, self).__getitem__(item)
613 def __contains__(self, k):
614 if k == '.arvados#project':
617 return super(ProjectDirectory, self).__contains__(k)
620 class SharedDirectory(Directory):
621 '''A special directory that represents users or groups who have shared projects with me.'''
623 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
624 poll=False, poll_time=60):
625 super(SharedDirectory, self).__init__(parent_inode)
628 self.num_retries = num_retries
629 self.current_user = api.users().current().execute(num_retries=num_retries)
631 self._poll_time = poll_time
634 with llfuse.lock_released:
635 all_projects = arvados.util.list_all(
636 self.api.groups().list, self.num_retries,
637 filters=[['group_class','=','project']])
639 for ob in all_projects:
640 objects[ob['uuid']] = ob
644 for ob in all_projects:
645 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
647 root_owners[ob['owner_uuid']] = True
649 lusers = arvados.util.list_all(
650 self.api.users().list, self.num_retries,
651 filters=[['uuid','in', list(root_owners)]])
652 lgroups = arvados.util.list_all(
653 self.api.groups().list, self.num_retries,
654 filters=[['uuid','in', list(root_owners)]])
660 objects[l["uuid"]] = l
662 objects[l["uuid"]] = l
665 for r in root_owners:
669 contents[obr["name"]] = obr
670 if "first_name" in obr:
671 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
674 if r['owner_uuid'] not in objects:
675 contents[r['name']] = r
677 # end with llfuse.lock_released, re-acquire lock
680 self.merge(contents.items(),
682 lambda a, i: a.uuid == i[1]['uuid'],
683 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
688 class FileHandle(object):
689 '''Connects a numeric file handle to a File or Directory object that has
690 been opened by the client.'''
692 def __init__(self, fh, entry):
697 class Inodes(object):
698 '''Manage the set of inodes. This is the mapping from a numeric id
699 to a concrete File or Directory object'''
703 self._counter = itertools.count(llfuse.ROOT_INODE)
705 def __getitem__(self, item):
706 return self._entries[item]
708 def __setitem__(self, key, item):
709 self._entries[key] = item
712 return self._entries.iterkeys()
715 return self._entries.items()
717 def __contains__(self, k):
718 return k in self._entries
720 def add_entry(self, entry):
721 entry.inode = next(self._counter)
722 self._entries[entry.inode] = entry
725 def del_entry(self, entry):
726 llfuse.invalidate_inode(entry.inode)
727 del self._entries[entry.inode]
729 class Operations(llfuse.Operations):
730 '''This is the main interface with llfuse. The methods on this object are
731 called by llfuse threads to service FUSE events to query and read from
734 llfuse has its own global lock which is acquired before calling a request handler,
735 so request handlers do not run concurrently unless the lock is explicitly released
736 using "with llfuse.lock_released:"'''
738 def __init__(self, uid, gid, encoding="utf-8"):
739 super(Operations, self).__init__()
741 self.inodes = Inodes()
744 self.encoding = encoding
746 # dict of inode to filehandle
747 self._filehandles = {}
748 self._filehandles_counter = 1
750 # Other threads that need to wait until the fuse driver
751 # is fully initialized should wait() on this event object.
752 self.initlock = threading.Event()
755 # Allow threads that are waiting for the driver to be finished
756 # initializing to continue
759 def access(self, inode, mode, ctx):
762 def getattr(self, inode):
763 if inode not in self.inodes:
764 raise llfuse.FUSEError(errno.ENOENT)
766 e = self.inodes[inode]
768 entry = llfuse.EntryAttributes()
771 entry.entry_timeout = 300
772 entry.attr_timeout = 300
774 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
775 if isinstance(e, Directory):
776 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
777 elif isinstance(e, StreamReaderFile):
778 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
780 entry.st_mode |= stat.S_IFREG
783 entry.st_uid = self.uid
784 entry.st_gid = self.gid
787 entry.st_size = e.size()
789 entry.st_blksize = 512
790 entry.st_blocks = (e.size()/512)+1
791 entry.st_atime = int(e.atime())
792 entry.st_mtime = int(e.mtime())
793 entry.st_ctime = int(e.mtime())
797 def lookup(self, parent_inode, name):
798 name = unicode(name, self.encoding)
799 _logger.debug("arv-mount lookup: parent_inode %i name %s",
806 if parent_inode in self.inodes:
807 p = self.inodes[parent_inode]
809 inode = p.parent_inode
810 elif isinstance(p, Directory) and name in p:
811 inode = p[name].inode
814 return self.getattr(inode)
816 raise llfuse.FUSEError(errno.ENOENT)
818 def open(self, inode, flags):
819 if inode in self.inodes:
820 p = self.inodes[inode]
822 raise llfuse.FUSEError(errno.ENOENT)
824 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
825 raise llfuse.FUSEError(errno.EROFS)
827 if isinstance(p, Directory):
828 raise llfuse.FUSEError(errno.EISDIR)
830 fh = self._filehandles_counter
831 self._filehandles_counter += 1
832 self._filehandles[fh] = FileHandle(fh, p)
835 def read(self, fh, off, size):
836 _logger.debug("arv-mount read %i %i %i", fh, off, size)
837 if fh in self._filehandles:
838 handle = self._filehandles[fh]
840 raise llfuse.FUSEError(errno.EBADF)
843 handle.entry._atime = time.time()
846 with llfuse.lock_released:
847 return handle.entry.readfrom(off, size)
848 except arvados.errors.NotFoundError as e:
849 _logger.warning("Block not found: " + str(e))
850 raise llfuse.FUSEError(errno.EIO)
853 raise llfuse.FUSEError(errno.EIO)
855 def release(self, fh):
856 if fh in self._filehandles:
857 del self._filehandles[fh]
859 def opendir(self, inode):
860 _logger.debug("arv-mount opendir: inode %i", inode)
862 if inode in self.inodes:
863 p = self.inodes[inode]
865 raise llfuse.FUSEError(errno.ENOENT)
867 if not isinstance(p, Directory):
868 raise llfuse.FUSEError(errno.ENOTDIR)
870 fh = self._filehandles_counter
871 self._filehandles_counter += 1
872 if p.parent_inode in self.inodes:
873 parent = self.inodes[p.parent_inode]
875 raise llfuse.FUSEError(errno.EIO)
878 p._atime = time.time()
880 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
883 def readdir(self, fh, off):
884 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
886 if fh in self._filehandles:
887 handle = self._filehandles[fh]
889 raise llfuse.FUSEError(errno.EBADF)
891 _logger.debug("arv-mount handle.entry %s", handle.entry)
894 while e < len(handle.entry):
895 if handle.entry[e][1].inode in self.inodes:
897 yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
898 except UnicodeEncodeError:
902 def releasedir(self, fh):
903 del self._filehandles[fh]
906 st = llfuse.StatvfsData()
907 st.f_bsize = 64 * 1024
920 # The llfuse documentation recommends only overloading functions that
921 # are actually implemented, as the default implementation will raise ENOSYS.
922 # However, there is a bug in the llfuse default implementation of create()
923 # "create() takes exactly 5 positional arguments (6 given)" which will crash
925 # The workaround is to implement it with the proper number of parameters,
926 # and then everything works out.
927 def create(self, p1, p2, p3, p4, p5):
928 raise llfuse.FUSEError(errno.EROFS)