2 # FUSE driver for Arvados Keep
24 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
26 _logger = logging.getLogger('arvados.arvados_fuse')
28 # Match any character which FUSE or Linux cannot accommodate as part
29 # of a filename. (If present in a collection filename, they will
30 # appear as underscores in the fuse mount.)
31 _disallowed_filename_characters = re.compile('[\x00/]')
33 class SafeApi(object):
34 '''Threadsafe wrapper for API object. This stores and returns a different api
35 object per thread, because httplib2 which underlies apiclient is not
39 def __init__(self, config):
40 self.host = config.get('ARVADOS_API_HOST')
41 self.api_token = config.get('ARVADOS_API_TOKEN')
42 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
43 self.local = threading.local()
44 self.block_cache = arvados.KeepBlockCache()
47 if 'api' not in self.local.__dict__:
48 self.local.api = arvados.api('v1', False, self.host,
49 self.api_token, self.insecure)
53 if 'keep' not in self.local.__dict__:
54 self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
55 return self.local.keep
57 def __getattr__(self, name):
58 # Proxy nonexistent attributes to the local API client.
60 return getattr(self.localapi(), name)
61 except AttributeError:
62 return super(SafeApi, self).__getattr__(name)
66 '''Parse Arvados timestamp to unix time.'''
68 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
69 except (TypeError, ValueError):
72 def sanitize_filename(dirty):
73 '''Replace disallowed filename characters with harmless "_".'''
83 return _disallowed_filename_characters.sub('_', dirty)
86 class FreshBase(object):
87 '''Base class for maintaining fresh/stale state to determine when to update.'''
91 self._last_update = time.time()
92 self._atime = time.time()
95 # Mark the value as stale
99 # Test if the entries dict is stale.
104 return (self._last_update + self._poll_time) < self._atime
109 self._last_update = time.time()
114 class File(FreshBase):
115 '''Base for file objects.'''
117 def __init__(self, parent_inode, _mtime=0):
118 super(File, self).__init__()
120 self.parent_inode = parent_inode
126 def readfrom(self, off, size):
133 class StreamReaderFile(File):
134 '''Wraps a StreamFileReader as a file.'''
136 def __init__(self, parent_inode, reader, _mtime):
137 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
141 return self.reader.size()
143 def readfrom(self, off, size):
144 return self.reader.readfrom(off, size)
150 class StringFile(File):
151 '''Wrap a simple string as a file'''
152 def __init__(self, parent_inode, contents, _mtime):
153 super(StringFile, self).__init__(parent_inode, _mtime)
154 self.contents = contents
157 return len(self.contents)
159 def readfrom(self, off, size):
160 return self.contents[off:(off+size)]
163 class ObjectFile(StringFile):
164 '''Wrap a dict as a serialized json object.'''
166 def __init__(self, parent_inode, obj):
167 super(ObjectFile, self).__init__(parent_inode, "", 0)
168 self.uuid = obj['uuid']
171 def update(self, obj):
172 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
173 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
176 class Directory(FreshBase):
177 '''Generic directory object, backed by a dict.
178 Consists of a set of entries with the key representing the filename
179 and the value referencing a File or Directory object.
182 def __init__(self, parent_inode):
183 super(Directory, self).__init__()
185 '''parent_inode is the integer inode number'''
187 if not isinstance(parent_inode, int):
188 raise Exception("parent_inode should be an int")
189 self.parent_inode = parent_inode
191 self._mtime = time.time()
193 # Overriden by subclasses to implement logic to update the entries dict
194 # when the directory is stale
198 # Only used when computing the size of the disk footprint of the directory
203 def checkupdate(self):
207 except apiclient.errors.HttpError as e:
210 def __getitem__(self, item):
212 return self._entries[item]
216 return self._entries.items()
220 return self._entries.iterkeys()
222 def __contains__(self, k):
224 return k in self._entries
226 def merge(self, items, fn, same, new_entry):
227 '''Helper method for updating the contents of the directory. Takes a list
228 describing the new contents of the directory, reuse entries that are
229 the same in both the old and new lists, create new entries, and delete
230 old entries missing from the new list.
232 items: iterable with new directory contents
234 fn: function to take an entry in 'items' and return the desired file or
235 directory name, or None if this entry should be skipped
237 same: function to compare an existing entry (a File or Directory
238 object) with an entry in the items list to determine whether to keep
241 new_entry: function to create a new directory entry (File or Directory
242 object) from an entry in the items list.
246 oldentries = self._entries
250 name = sanitize_filename(fn(i))
252 if name in oldentries and same(oldentries[name], i):
253 # move existing directory entry over
254 self._entries[name] = oldentries[name]
257 # create new directory entry
260 self._entries[name] = self.inodes.add_entry(ent)
263 # delete any other directory entries that were not in found in 'items'
265 llfuse.invalidate_entry(self.inode, str(i))
266 self.inodes.del_entry(oldentries[i])
270 self._mtime = time.time()
275 '''Delete all entries'''
276 oldentries = self._entries
279 if isinstance(n, Directory):
281 llfuse.invalidate_entry(self.inode, str(n))
282 self.inodes.del_entry(oldentries[n])
289 class CollectionDirectory(Directory):
290 '''Represents the root of a directory tree holding a collection.'''
292 def __init__(self, parent_inode, inodes, api, num_retries, collection):
293 super(CollectionDirectory, self).__init__(parent_inode)
296 self.num_retries = num_retries
297 self.collection_object_file = None
298 self.collection_object = None
299 if isinstance(collection, dict):
300 self.collection_locator = collection['uuid']
302 self.collection_locator = collection
305 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
307 def new_collection(self, new_collection_object, coll_reader):
308 self.collection_object = new_collection_object
310 if self.collection_object_file is not None:
311 self.collection_object_file.update(self.collection_object)
314 for s in coll_reader.all_streams():
316 for part in s.name().split('/'):
317 if part != '' and part != '.':
318 partname = sanitize_filename(part)
319 if partname not in cwd._entries:
320 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
321 cwd = cwd._entries[partname]
322 for k, v in s.files().items():
323 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
327 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
330 with llfuse.lock_released:
331 coll_reader = arvados.CollectionReader(
332 self.collection_locator, self.api, self.api.localkeep(),
333 num_retries=self.num_retries)
334 new_collection_object = coll_reader.api_response() or {}
335 # If the Collection only exists in Keep, there will be no API
336 # response. Fill in the fields we need.
337 if 'uuid' not in new_collection_object:
338 new_collection_object['uuid'] = self.collection_locator
339 if "portable_data_hash" not in new_collection_object:
340 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
341 if 'manifest_text' not in new_collection_object:
342 new_collection_object['manifest_text'] = coll_reader.manifest_text()
343 coll_reader.normalize()
344 # end with llfuse.lock_released, re-acquire lock
346 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
347 self.new_collection(new_collection_object, coll_reader)
351 except apiclient.errors.NotFoundError:
352 _logger.exception("arv-mount %s: error", self.collection_locator)
353 except arvados.errors.ArgumentError as detail:
354 _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
355 if self.collection_object is not None and "manifest_text" in self.collection_object:
356 _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
358 _logger.exception("arv-mount %s: error", self.collection_locator)
359 if self.collection_object is not None and "manifest_text" in self.collection_object:
360 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
363 def __getitem__(self, item):
365 if item == '.arvados#collection':
366 if self.collection_object_file is None:
367 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
368 self.inodes.add_entry(self.collection_object_file)
369 return self.collection_object_file
371 return super(CollectionDirectory, self).__getitem__(item)
373 def __contains__(self, k):
374 if k == '.arvados#collection':
377 return super(CollectionDirectory, self).__contains__(k)
381 return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
384 class MagicDirectory(Directory):
385 '''A special directory that logically contains the set of all extant keep
386 locators. When a file is referenced by lookup(), it is tested to see if it
387 is a valid keep locator to a manifest, and if so, loads the manifest
388 contents as a subdirectory of this directory with the locator as the
389 directory name. Since querying a list of all extant keep locators is
390 impractical, only collections that have already been accessed are visible
395 This directory provides access to Arvados collections as subdirectories listed
396 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
397 the form '1234567890abcdefghijklmnopqrstuv+123').
399 Note that this directory will appear empty until you attempt to access a
400 specific collection subdirectory (such as trying to 'cd' into it), at which
401 point the collection will actually be looked up on the server and the directory
402 will appear if it exists.
405 def __init__(self, parent_inode, inodes, api, num_retries):
406 super(MagicDirectory, self).__init__(parent_inode)
409 self.num_retries = num_retries
411 def __setattr__(self, name, value):
412 super(MagicDirectory, self).__setattr__(name, value)
413 # When we're assigned an inode, add a README.
414 if ((name == 'inode') and (self.inode is not None) and
415 (not self._entries)):
416 self._entries['README'] = self.inodes.add_entry(
417 StringFile(self.inode, self.README_TEXT, time.time()))
418 # If we're the root directory, add an identical by_id subdirectory.
419 if self.inode == llfuse.ROOT_INODE:
420 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
421 self.inode, self.inodes, self.api, self.num_retries))
423 def __contains__(self, k):
424 if k in self._entries:
427 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
431 e = self.inodes.add_entry(CollectionDirectory(
432 self.inode, self.inodes, self.api, self.num_retries, k))
438 except Exception as e:
439 _logger.debug('arv-mount exception keep %s', e)
442 def __getitem__(self, item):
444 return self._entries[item]
446 raise KeyError("No collection with id " + item)
449 class RecursiveInvalidateDirectory(Directory):
450 def invalidate(self):
451 if self.inode == llfuse.ROOT_INODE:
452 llfuse.lock.acquire()
454 super(RecursiveInvalidateDirectory, self).invalidate()
455 for a in self._entries:
456 self._entries[a].invalidate()
460 if self.inode == llfuse.ROOT_INODE:
461 llfuse.lock.release()
464 class TagsDirectory(RecursiveInvalidateDirectory):
465 '''A special directory that contains as subdirectories all tags visible to the user.'''
467 def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
468 super(TagsDirectory, self).__init__(parent_inode)
471 self.num_retries = num_retries
473 self._poll_time = poll_time
476 with llfuse.lock_released:
477 tags = self.api.links().list(
478 filters=[['link_class', '=', 'tag']],
479 select=['name'], distinct=True
480 ).execute(num_retries=self.num_retries)
482 self.merge(tags['items'],
483 lambda i: i['name'] if 'name' in i else i['uuid'],
484 lambda a, i: a.tag == i,
485 lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
488 class TagDirectory(Directory):
489 '''A special directory that contains as subdirectories all collections visible
490 to the user that are tagged with a particular tag.
493 def __init__(self, parent_inode, inodes, api, num_retries, tag,
494 poll=False, poll_time=60):
495 super(TagDirectory, self).__init__(parent_inode)
498 self.num_retries = num_retries
501 self._poll_time = poll_time
504 with llfuse.lock_released:
505 taggedcollections = self.api.links().list(
506 filters=[['link_class', '=', 'tag'],
507 ['name', '=', self.tag],
508 ['head_uuid', 'is_a', 'arvados#collection']],
510 ).execute(num_retries=self.num_retries)
511 self.merge(taggedcollections['items'],
512 lambda i: i['head_uuid'],
513 lambda a, i: a.collection_locator == i['head_uuid'],
514 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
517 class ProjectDirectory(Directory):
518 '''A special directory that contains the contents of a project.'''
520 def __init__(self, parent_inode, inodes, api, num_retries, project_object,
521 poll=False, poll_time=60):
522 super(ProjectDirectory, self).__init__(parent_inode)
525 self.num_retries = num_retries
526 self.project_object = project_object
527 self.project_object_file = None
528 self.uuid = project_object['uuid']
530 self._poll_time = poll_time
532 def createDirectory(self, i):
533 if collection_uuid_pattern.match(i['uuid']):
534 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
535 elif group_uuid_pattern.match(i['uuid']):
536 return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
537 elif link_uuid_pattern.match(i['uuid']):
538 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
539 return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
542 elif uuid_pattern.match(i['uuid']):
543 return ObjectFile(self.parent_inode, i)
548 if self.project_object_file == None:
549 self.project_object_file = ObjectFile(self.inode, self.project_object)
550 self.inodes.add_entry(self.project_object_file)
554 if i['name'] is None or len(i['name']) == 0:
556 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
557 # collection or subproject
559 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
562 elif 'kind' in i and i['kind'].startswith('arvados#'):
564 return "{}.{}".format(i['name'], i['kind'][8:])
569 if isinstance(a, CollectionDirectory):
570 return a.collection_locator == i['uuid']
571 elif isinstance(a, ProjectDirectory):
572 return a.uuid == i['uuid']
573 elif isinstance(a, ObjectFile):
574 return a.uuid == i['uuid'] and not a.stale()
577 with llfuse.lock_released:
578 if group_uuid_pattern.match(self.uuid):
579 self.project_object = self.api.groups().get(
580 uuid=self.uuid).execute(num_retries=self.num_retries)
581 elif user_uuid_pattern.match(self.uuid):
582 self.project_object = self.api.users().get(
583 uuid=self.uuid).execute(num_retries=self.num_retries)
585 contents = arvados.util.list_all(self.api.groups().contents,
586 self.num_retries, uuid=self.uuid)
587 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
588 contents += arvados.util.list_all(
589 self.api.links().list, self.num_retries,
590 filters=[['tail_uuid', '=', self.uuid],
591 ['link_class', '=', 'name']])
593 # end with llfuse.lock_released, re-acquire lock
598 self.createDirectory)
600 def __getitem__(self, item):
602 if item == '.arvados#project':
603 return self.project_object_file
605 return super(ProjectDirectory, self).__getitem__(item)
607 def __contains__(self, k):
608 if k == '.arvados#project':
611 return super(ProjectDirectory, self).__contains__(k)
614 class SharedDirectory(Directory):
615 '''A special directory that represents users or groups who have shared projects with me.'''
617 def __init__(self, parent_inode, inodes, api, num_retries, exclude,
618 poll=False, poll_time=60):
619 super(SharedDirectory, self).__init__(parent_inode)
622 self.num_retries = num_retries
623 self.current_user = api.users().current().execute(num_retries=num_retries)
625 self._poll_time = poll_time
628 with llfuse.lock_released:
629 all_projects = arvados.util.list_all(
630 self.api.groups().list, self.num_retries,
631 filters=[['group_class','=','project']])
633 for ob in all_projects:
634 objects[ob['uuid']] = ob
638 for ob in all_projects:
639 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
641 root_owners[ob['owner_uuid']] = True
643 lusers = arvados.util.list_all(
644 self.api.users().list, self.num_retries,
645 filters=[['uuid','in', list(root_owners)]])
646 lgroups = arvados.util.list_all(
647 self.api.groups().list, self.num_retries,
648 filters=[['uuid','in', list(root_owners)]])
654 objects[l["uuid"]] = l
656 objects[l["uuid"]] = l
659 for r in root_owners:
663 contents[obr["name"]] = obr
664 if "first_name" in obr:
665 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
668 if r['owner_uuid'] not in objects:
669 contents[r['name']] = r
671 # end with llfuse.lock_released, re-acquire lock
674 self.merge(contents.items(),
676 lambda a, i: a.uuid == i[1]['uuid'],
677 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
682 class FileHandle(object):
683 '''Connects a numeric file handle to a File or Directory object that has
684 been opened by the client.'''
686 def __init__(self, fh, entry):
691 class Inodes(object):
692 '''Manage the set of inodes. This is the mapping from a numeric id
693 to a concrete File or Directory object'''
697 self._counter = itertools.count(llfuse.ROOT_INODE)
699 def __getitem__(self, item):
700 return self._entries[item]
702 def __setitem__(self, key, item):
703 self._entries[key] = item
706 return self._entries.iterkeys()
709 return self._entries.items()
711 def __contains__(self, k):
712 return k in self._entries
714 def add_entry(self, entry):
715 entry.inode = next(self._counter)
716 self._entries[entry.inode] = entry
719 def del_entry(self, entry):
720 llfuse.invalidate_inode(entry.inode)
721 del self._entries[entry.inode]
723 class Operations(llfuse.Operations):
724 '''This is the main interface with llfuse. The methods on this object are
725 called by llfuse threads to service FUSE events to query and read from
728 llfuse has its own global lock which is acquired before calling a request handler,
729 so request handlers do not run concurrently unless the lock is explicitly released
730 using "with llfuse.lock_released:"'''
732 def __init__(self, uid, gid, encoding="utf-8"):
733 super(Operations, self).__init__()
735 self.inodes = Inodes()
738 self.encoding = encoding
740 # dict of inode to filehandle
741 self._filehandles = {}
742 self._filehandles_counter = 1
744 # Other threads that need to wait until the fuse driver
745 # is fully initialized should wait() on this event object.
746 self.initlock = threading.Event()
749 # Allow threads that are waiting for the driver to be finished
750 # initializing to continue
753 def access(self, inode, mode, ctx):
756 def getattr(self, inode):
757 if inode not in self.inodes:
758 raise llfuse.FUSEError(errno.ENOENT)
760 e = self.inodes[inode]
762 entry = llfuse.EntryAttributes()
765 entry.entry_timeout = 300
766 entry.attr_timeout = 300
768 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
769 if isinstance(e, Directory):
770 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
771 elif isinstance(e, StreamReaderFile):
772 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
774 entry.st_mode |= stat.S_IFREG
777 entry.st_uid = self.uid
778 entry.st_gid = self.gid
781 entry.st_size = e.size()
783 entry.st_blksize = 512
784 entry.st_blocks = (e.size()/512)+1
785 entry.st_atime = int(e.atime())
786 entry.st_mtime = int(e.mtime())
787 entry.st_ctime = int(e.mtime())
791 def lookup(self, parent_inode, name):
792 name = unicode(name, self.encoding)
793 _logger.debug("arv-mount lookup: parent_inode %i name %s",
800 if parent_inode in self.inodes:
801 p = self.inodes[parent_inode]
803 inode = p.parent_inode
804 elif isinstance(p, Directory) and name in p:
805 inode = p[name].inode
808 return self.getattr(inode)
810 raise llfuse.FUSEError(errno.ENOENT)
812 def open(self, inode, flags):
813 if inode in self.inodes:
814 p = self.inodes[inode]
816 raise llfuse.FUSEError(errno.ENOENT)
818 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
819 raise llfuse.FUSEError(errno.EROFS)
821 if isinstance(p, Directory):
822 raise llfuse.FUSEError(errno.EISDIR)
824 fh = self._filehandles_counter
825 self._filehandles_counter += 1
826 self._filehandles[fh] = FileHandle(fh, p)
829 def read(self, fh, off, size):
830 _logger.debug("arv-mount read %i %i %i", fh, off, size)
831 if fh in self._filehandles:
832 handle = self._filehandles[fh]
834 raise llfuse.FUSEError(errno.EBADF)
837 handle.entry._atime = time.time()
840 with llfuse.lock_released:
841 return handle.entry.readfrom(off, size)
842 except arvados.errors.NotFoundError as e:
843 _logger.warning("Block not found: " + str(e))
844 raise llfuse.FUSEError(errno.EIO)
847 raise llfuse.FUSEError(errno.EIO)
849 def release(self, fh):
850 if fh in self._filehandles:
851 del self._filehandles[fh]
853 def opendir(self, inode):
854 _logger.debug("arv-mount opendir: inode %i", inode)
856 if inode in self.inodes:
857 p = self.inodes[inode]
859 raise llfuse.FUSEError(errno.ENOENT)
861 if not isinstance(p, Directory):
862 raise llfuse.FUSEError(errno.ENOTDIR)
864 fh = self._filehandles_counter
865 self._filehandles_counter += 1
866 if p.parent_inode in self.inodes:
867 parent = self.inodes[p.parent_inode]
869 raise llfuse.FUSEError(errno.EIO)
872 p._atime = time.time()
874 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
877 def readdir(self, fh, off):
878 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
880 if fh in self._filehandles:
881 handle = self._filehandles[fh]
883 raise llfuse.FUSEError(errno.EBADF)
885 _logger.debug("arv-mount handle.entry %s", handle.entry)
888 while e < len(handle.entry):
889 if handle.entry[e][1].inode in self.inodes:
891 yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
892 except UnicodeEncodeError:
896 def releasedir(self, fh):
897 del self._filehandles[fh]
900 st = llfuse.StatvfsData()
901 st.f_bsize = 64 * 1024
914 # The llfuse documentation recommends only overloading functions that
915 # are actually implemented, as the default implementation will raise ENOSYS.
916 # However, there is a bug in the llfuse default implementation of create()
917 # "create() takes exactly 5 positional arguments (6 given)" which will crash
919 # The workaround is to implement it with the proper number of parameters,
920 # and then everything works out.
921 def create(self, p1, p2, p3, p4, p5):
922 raise llfuse.FUSEError(errno.EROFS)