2 # FUSE driver for Arvados Keep
8 from llfuse import FUSEError
23 _logger = logging.getLogger('arvados.arvados_fuse')
25 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
26 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
27 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
28 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
29 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
30 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
32 class SafeApi(object):
33 '''Threadsafe wrapper for API object. This stores and returns a different api
34 object per thread, because httplib2 which underlies apiclient is not
38 def __init__(self, config):
39 self.host = config.get('ARVADOS_API_HOST')
40 self.token = config.get('ARVADOS_API_TOKEN')
41 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
42 self.local = threading.local()
45 if 'api' not in self.local.__dict__:
46 self.local.api = arvados.api('v1', False, self.host, self.token, self.insecure)
49 def collections(self):
50 return self.localapi().collections()
53 return self.localapi().links()
56 return self.localapi().groups()
59 return self.localapi().users()
62 '''Parse Arvados timestamp to unix time.'''
64 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
65 except (TypeError, ValueError):
68 def sanitize_filename(dirty):
69 '''Remove troublesome characters from filenames.'''
70 # http://www.dwheeler.com/essays/fixing-unix-linux-filenames.html
76 if (c >= '\x00' and c <= '\x1f') or c == '\x7f' or c == '/':
77 # skip control characters and /
81 # strip leading - or ~ and leading/trailing whitespace
82 stripped = fn.lstrip("-~ ").rstrip()
89 class FreshBase(object):
90 '''Base class for maintaining fresh/stale state to determine when to update.'''
94 self._last_update = time.time()
95 self._atime = time.time()
98 # Mark the value as stale
102 # Test if the entries dict is stale.
107 return (self._last_update + self._poll_time) < self._atime
112 self._last_update = time.time()
117 class File(FreshBase):
118 '''Base for file objects.'''
120 def __init__(self, parent_inode, _mtime=0):
121 super(File, self).__init__()
123 self.parent_inode = parent_inode
129 def readfrom(self, off, size):
136 class StreamReaderFile(File):
137 '''Wraps a StreamFileReader as a file.'''
139 def __init__(self, parent_inode, reader, _mtime):
140 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
144 return self.reader.size()
146 def readfrom(self, off, size):
147 return self.reader.readfrom(off, size)
153 class StringFile(File):
154 '''Wrap a simple string as a file'''
155 def __init__(self, parent_inode, contents, _mtime):
156 super(StringFile, self).__init__(parent_inode, _mtime)
157 self.contents = contents
160 return len(self.contents)
162 def readfrom(self, off, size):
163 return self.contents[off:(off+size)]
166 class ObjectFile(StringFile):
167 '''Wrap a dict as a serialized json object.'''
169 def __init__(self, parent_inode, obj):
170 super(ObjectFile, self).__init__(parent_inode, "", 0)
171 self.uuid = obj['uuid']
174 def update(self, obj):
175 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
176 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
179 class Directory(FreshBase):
180 '''Generic directory object, backed by a dict.
181 Consists of a set of entries with the key representing the filename
182 and the value referencing a File or Directory object.
185 def __init__(self, parent_inode):
186 super(Directory, self).__init__()
188 '''parent_inode is the integer inode number'''
190 if not isinstance(parent_inode, int):
191 raise Exception("parent_inode should be an int")
192 self.parent_inode = parent_inode
194 self._mtime = time.time()
196 # Overriden by subclasses to implement logic to update the entries dict
197 # when the directory is stale
201 # Only used when computing the size of the disk footprint of the directory
206 def checkupdate(self):
210 except apiclient.errors.HttpError as e:
213 def __getitem__(self, item):
215 return self._entries[item]
219 return self._entries.items()
223 return self._entries.iterkeys()
225 def __contains__(self, k):
227 return k in self._entries
229 def merge(self, items, fn, same, new_entry):
230 '''Helper method for updating the contents of the directory. Takes a list
231 describing the new contents of the directory, reuse entries that are
232 the same in both the old and new lists, create new entries, and delete
233 old entries missing from the new list.
235 items: iterable with new directory contents
237 fn: function to take an entry in 'items' and return the desired file or
238 directory name, or None if this entry should be skipped
240 same: function to compare an existing entry (a File or Directory
241 object) with an entry in the items list to determine whether to keep
244 new_entry: function to create a new directory entry (File or Directory
245 object) from an entry in the items list.
249 oldentries = self._entries
253 name = sanitize_filename(fn(i))
255 if name in oldentries and same(oldentries[name], i):
256 # move existing directory entry over
257 self._entries[name] = oldentries[name]
260 # create new directory entry
263 self._entries[name] = self.inodes.add_entry(ent)
266 # delete any other directory entries that were not in found in 'items'
268 llfuse.invalidate_entry(self.inode, str(i))
269 self.inodes.del_entry(oldentries[i])
273 self._mtime = time.time()
278 '''Delete all entries'''
279 oldentries = self._entries
282 if isinstance(n, Directory):
284 llfuse.invalidate_entry(self.inode, str(n))
285 self.inodes.del_entry(oldentries[n])
292 class CollectionDirectory(Directory):
293 '''Represents the root of a directory tree holding a collection.'''
295 def __init__(self, parent_inode, inodes, api, collection):
296 super(CollectionDirectory, self).__init__(parent_inode)
299 self.collection_object_file = None
300 self.collection_object = None
301 if isinstance(collection, dict):
302 self.collection_locator = collection['uuid']
304 self.collection_locator = collection
307 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
309 def new_collection(self, new_collection_object):
310 self.collection_object = new_collection_object
312 if self.collection_object_file is not None:
313 self.collection_object_file.update(self.collection_object)
316 collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api)
317 for s in collection.all_streams():
319 for part in s.name().split('/'):
320 if part != '' and part != '.':
321 partname = sanitize_filename(part)
322 if partname not in cwd._entries:
323 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
324 cwd = cwd._entries[partname]
325 for k, v in s.files().items():
326 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
330 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
333 with llfuse.lock_released:
334 new_collection_object = self.api.collections().get(uuid=self.collection_locator).execute()
335 if "portable_data_hash" not in new_collection_object:
336 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
337 # end with llfuse.lock_released, re-acquire lock
339 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
340 self.new_collection(new_collection_object)
344 except apiclient.errors.HttpError as e:
345 if e.resp.status == 404:
346 _logger.warn("arv-mount %s: not found", self.collection_locator)
348 _logger.error("arv-mount %s: error", self.collection_locator)
349 _logger.exception(detail)
350 except Exception as detail:
351 _logger.error("arv-mount %s: error", self.collection_locator)
352 if self.collection_object is not None and "manifest_text" in self.collection_object:
353 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
354 _logger.exception(detail)
357 def __getitem__(self, item):
359 if item == '.arvados#collection':
360 if self.collection_object_file is None:
361 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
362 self.inodes.add_entry(self.collection_object_file)
363 return self.collection_object_file
365 return super(CollectionDirectory, self).__getitem__(item)
367 def __contains__(self, k):
368 if k == '.arvados#collection':
371 return super(CollectionDirectory, self).__contains__(k)
375 return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
378 class MagicDirectory(Directory):
379 '''A special directory that logically contains the set of all extant keep
380 locators. When a file is referenced by lookup(), it is tested to see if it
381 is a valid keep locator to a manifest, and if so, loads the manifest
382 contents as a subdirectory of this directory with the locator as the
383 directory name. Since querying a list of all extant keep locators is
384 impractical, only collections that have already been accessed are visible
388 def __init__(self, parent_inode, inodes, api):
389 super(MagicDirectory, self).__init__(parent_inode)
392 # Have to defer creating readme_file because at this point we don't
393 # yet have an inode assigned.
394 self.readme_file = None
396 def create_readme(self):
397 if self.readme_file is None:
398 text = '''This directory provides access to Arvados collections as subdirectories listed
399 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
400 the form '1234567890abcdefghijklmnopqrstuv+123').
402 Note that this directory will appear empty until you attempt to access a
403 specific collection subdirectory (such as trying to 'cd' into it), at which
404 point the collection will actually be looked up on the server and the directory
405 will appear if it exists.
407 self.readme_file = self.inodes.add_entry(StringFile(self.inode, text, time.time()))
408 self._entries["README"] = self.readme_file
410 def __contains__(self, k):
413 if k in self._entries:
416 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
420 e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, self.api, k))
426 except Exception as e:
427 _logger.debug('arv-mount exception keep %s', e)
432 return self._entries.items()
434 def __getitem__(self, item):
436 return self._entries[item]
438 raise KeyError("No collection with id " + item)
441 class RecursiveInvalidateDirectory(Directory):
442 def invalidate(self):
443 if self.inode == llfuse.ROOT_INODE:
444 llfuse.lock.acquire()
446 super(RecursiveInvalidateDirectory, self).invalidate()
447 for a in self._entries:
448 self._entries[a].invalidate()
449 except Exception as e:
452 if self.inode == llfuse.ROOT_INODE:
453 llfuse.lock.release()
456 class TagsDirectory(RecursiveInvalidateDirectory):
457 '''A special directory that contains as subdirectories all tags visible to the user.'''
459 def __init__(self, parent_inode, inodes, api, poll_time=60):
460 super(TagsDirectory, self).__init__(parent_inode)
464 self._poll_time = poll_time
467 with llfuse.lock_released:
468 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
470 self.merge(tags['items'],
471 lambda i: i['name'] if 'name' in i else i['uuid'],
472 lambda a, i: a.tag == i,
473 lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
476 class TagDirectory(Directory):
477 '''A special directory that contains as subdirectories all collections visible
478 to the user that are tagged with a particular tag.
481 def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
482 super(TagDirectory, self).__init__(parent_inode)
487 self._poll_time = poll_time
490 with llfuse.lock_released:
491 taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
492 ['name', '=', self.tag],
493 ['head_uuid', 'is_a', 'arvados#collection']],
494 select=['head_uuid']).execute()
495 self.merge(taggedcollections['items'],
496 lambda i: i['head_uuid'],
497 lambda a, i: a.collection_locator == i['head_uuid'],
498 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid']))
501 class ProjectDirectory(Directory):
502 '''A special directory that contains the contents of a project.'''
504 def __init__(self, parent_inode, inodes, api, project_object, poll=False, poll_time=60):
505 super(ProjectDirectory, self).__init__(parent_inode)
508 self.project_object = project_object
509 self.project_object_file = None
510 self.uuid = project_object['uuid']
512 def createDirectory(self, i):
513 if collection_uuid_pattern.match(i['uuid']):
514 return CollectionDirectory(self.inode, self.inodes, self.api, i)
515 elif group_uuid_pattern.match(i['uuid']):
516 return ProjectDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
517 elif link_uuid_pattern.match(i['uuid']):
518 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
519 return CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid'])
522 elif uuid_pattern.match(i['uuid']):
523 return ObjectFile(self.parent_inode, i)
528 if self.project_object_file == None:
529 self.project_object_file = ObjectFile(self.inode, self.project_object)
530 self.inodes.add_entry(self.project_object_file)
534 if i['name'] is None or len(i['name']) == 0:
536 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
537 # collection or subproject
539 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
542 elif 'kind' in i and i['kind'].startswith('arvados#'):
544 return "{}.{}".format(i['name'], i['kind'][8:])
549 if isinstance(a, CollectionDirectory):
550 return a.collection_locator == i['uuid']
551 elif isinstance(a, ProjectDirectory):
552 return a.uuid == i['uuid']
553 elif isinstance(a, ObjectFile):
554 return a.uuid == i['uuid'] and not a.stale()
557 with llfuse.lock_released:
558 if group_uuid_pattern.match(self.uuid):
559 self.project_object = self.api.groups().get(uuid=self.uuid).execute()
560 elif user_uuid_pattern.match(self.uuid):
561 self.project_object = self.api.users().get(uuid=self.uuid).execute()
563 contents = arvados.util.list_all(self.api.groups().contents, uuid=self.uuid)
564 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
565 contents += arvados.util.list_all(self.api.links().list, filters=[['tail_uuid', '=', self.uuid], ['link_class', '=', 'name']])
567 # end with llfuse.lock_released, re-acquire lock
572 self.createDirectory)
574 def __getitem__(self, item):
576 if item == '.arvados#project':
577 return self.project_object_file
579 return super(ProjectDirectory, self).__getitem__(item)
581 def __contains__(self, k):
582 if k == '.arvados#project':
585 return super(ProjectDirectory, self).__contains__(k)
588 class SharedDirectory(Directory):
589 '''A special directory that represents users or groups who have shared projects with me.'''
591 def __init__(self, parent_inode, inodes, api, exclude, poll=False, poll_time=60):
592 super(SharedDirectory, self).__init__(parent_inode)
593 self.current_user = api.users().current().execute()
597 self._poll_time = poll_time
600 with llfuse.lock_released:
601 all_projects = arvados.util.list_all(self.api.groups().list, filters=[['group_class','=','project']])
603 for ob in all_projects:
604 objects[ob['uuid']] = ob
608 for ob in all_projects:
609 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
611 root_owners[ob['owner_uuid']] = True
613 lusers = arvados.util.list_all(self.api.users().list, filters=[['uuid','in', list(root_owners)]])
614 lgroups = arvados.util.list_all(self.api.groups().list, filters=[['uuid','in', list(root_owners)]])
620 objects[l["uuid"]] = l
622 objects[l["uuid"]] = l
625 for r in root_owners:
629 contents[obr["name"]] = obr
630 if "first_name" in obr:
631 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
634 if r['owner_uuid'] not in objects:
635 contents[r['name']] = r
637 # end with llfuse.lock_released, re-acquire lock
640 self.merge(contents.items(),
642 lambda a, i: a.uuid == i[1]['uuid'],
643 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, i[1], poll=self._poll, poll_time=self._poll_time))
644 except Exception as e:
648 class FileHandle(object):
649 '''Connects a numeric file handle to a File or Directory object that has
650 been opened by the client.'''
652 def __init__(self, fh, entry):
657 class Inodes(object):
658 '''Manage the set of inodes. This is the mapping from a numeric id
659 to a concrete File or Directory object'''
663 self._counter = llfuse.ROOT_INODE
665 def __getitem__(self, item):
666 return self._entries[item]
668 def __setitem__(self, key, item):
669 self._entries[key] = item
672 return self._entries.iterkeys()
675 return self._entries.items()
677 def __contains__(self, k):
678 return k in self._entries
680 def add_entry(self, entry):
681 entry.inode = self._counter
682 self._entries[entry.inode] = entry
686 def del_entry(self, entry):
687 llfuse.invalidate_inode(entry.inode)
688 del self._entries[entry.inode]
690 class Operations(llfuse.Operations):
691 '''This is the main interface with llfuse. The methods on this object are
692 called by llfuse threads to service FUSE events to query and read from
695 llfuse has its own global lock which is acquired before calling a request handler,
696 so request handlers do not run concurrently unless the lock is explicitly released
697 using "with llfuse.lock_released:"'''
699 def __init__(self, uid, gid):
700 super(Operations, self).__init__()
702 self.inodes = Inodes()
706 # dict of inode to filehandle
707 self._filehandles = {}
708 self._filehandles_counter = 1
710 # Other threads that need to wait until the fuse driver
711 # is fully initialized should wait() on this event object.
712 self.initlock = threading.Event()
715 # Allow threads that are waiting for the driver to be finished
716 # initializing to continue
719 def access(self, inode, mode, ctx):
722 def getattr(self, inode):
723 if inode not in self.inodes:
724 raise llfuse.FUSEError(errno.ENOENT)
726 e = self.inodes[inode]
728 entry = llfuse.EntryAttributes()
731 entry.entry_timeout = 300
732 entry.attr_timeout = 300
734 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
735 if isinstance(e, Directory):
736 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
738 entry.st_mode |= stat.S_IFREG
741 entry.st_uid = self.uid
742 entry.st_gid = self.gid
745 entry.st_size = e.size()
747 entry.st_blksize = 512
748 entry.st_blocks = (e.size()/512)+1
749 entry.st_atime = int(e.atime())
750 entry.st_mtime = int(e.mtime())
751 entry.st_ctime = int(e.mtime())
755 def lookup(self, parent_inode, name):
756 _logger.debug("arv-mount lookup: parent_inode %i name %s",
763 if parent_inode in self.inodes:
764 p = self.inodes[parent_inode]
766 inode = p.parent_inode
767 elif isinstance(p, Directory) and name in p:
768 inode = p[name].inode
771 return self.getattr(inode)
773 raise llfuse.FUSEError(errno.ENOENT)
775 def open(self, inode, flags):
776 if inode in self.inodes:
777 p = self.inodes[inode]
779 raise llfuse.FUSEError(errno.ENOENT)
781 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
782 raise llfuse.FUSEError(errno.EROFS)
784 if isinstance(p, Directory):
785 raise llfuse.FUSEError(errno.EISDIR)
787 fh = self._filehandles_counter
788 self._filehandles_counter += 1
789 self._filehandles[fh] = FileHandle(fh, p)
792 def read(self, fh, off, size):
793 _logger.debug("arv-mount read %i %i %i", fh, off, size)
794 if fh in self._filehandles:
795 handle = self._filehandles[fh]
797 raise llfuse.FUSEError(errno.EBADF)
800 handle.entry._atime = time.time()
803 with llfuse.lock_released:
804 return handle.entry.readfrom(off, size)
806 raise llfuse.FUSEError(errno.EIO)
808 def release(self, fh):
809 if fh in self._filehandles:
810 del self._filehandles[fh]
812 def opendir(self, inode):
813 _logger.debug("arv-mount opendir: inode %i", inode)
815 if inode in self.inodes:
816 p = self.inodes[inode]
818 raise llfuse.FUSEError(errno.ENOENT)
820 if not isinstance(p, Directory):
821 raise llfuse.FUSEError(errno.ENOTDIR)
823 fh = self._filehandles_counter
824 self._filehandles_counter += 1
825 if p.parent_inode in self.inodes:
826 parent = self.inodes[p.parent_inode]
828 raise llfuse.FUSEError(errno.EIO)
831 p._atime = time.time()
833 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
836 def readdir(self, fh, off):
837 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
839 if fh in self._filehandles:
840 handle = self._filehandles[fh]
842 raise llfuse.FUSEError(errno.EBADF)
844 _logger.debug("arv-mount handle.entry %s", handle.entry)
847 while e < len(handle.entry):
848 if handle.entry[e][1].inode in self.inodes:
849 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
852 def releasedir(self, fh):
853 del self._filehandles[fh]
856 st = llfuse.StatvfsData()
857 st.f_bsize = 64 * 1024
870 # The llfuse documentation recommends only overloading functions that
871 # are actually implemented, as the default implementation will raise ENOSYS.
872 # However, there is a bug in the llfuse default implementation of create()
873 # "create() takes exactly 5 positional arguments (6 given)" which will crash
875 # The workaround is to implement it with the proper number of parameters,
876 # and then everything works out.
877 def create(self, p1, p2, p3, p4, p5):
878 raise llfuse.FUSEError(errno.EROFS)