2 # FUSE driver for Arvados Keep
8 from llfuse import FUSEError
23 _logger = logging.getLogger('arvados.arvados_fuse')
25 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+\d+')
26 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
27 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
28 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
29 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
30 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
32 class SafeApi(object):
33 '''Threadsafe wrapper for API object. This stores and returns a different api
34 object per thread, because httplib2 which underlies apiclient is not
38 def __init__(self, config):
39 self.host = config.get('ARVADOS_API_HOST')
40 self.token = config.get('ARVADOS_API_TOKEN')
41 self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
42 self.local = threading.local()
45 if 'api' not in self.local.__dict__:
46 self.local.api = arvados.api('v1', False, self.host, self.token, self.insecure)
49 def collections(self):
50 return self.localapi().collections()
53 return self.localapi().links()
56 return self.localapi().groups()
59 return self.localapi().users()
62 '''Parse Arvados timestamp to unix time.'''
63 return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
65 def sanitize_filename(dirty):
66 '''Remove troublesome characters from filenames.'''
67 # http://www.dwheeler.com/essays/fixing-unix-linux-filenames.html
73 if (c >= '\x00' and c <= '\x1f') or c == '\x7f' or c == '/':
74 # skip control characters and /
78 # strip leading - or ~ and leading/trailing whitespace
79 stripped = fn.lstrip("-~ ").rstrip()
86 class FreshBase(object):
87 '''Base class for maintaining fresh/stale state to determine when to update.'''
91 self._last_update = time.time()
92 self._atime = time.time()
95 # Mark the value as stale
99 # Test if the entries dict is stale. Also updates atime.
101 self._atime = time.time()
105 return (self._last_update + self._poll_time) < self._atime
110 self._last_update = time.time()
115 class File(FreshBase):
116 '''Base for file objects.'''
118 def __init__(self, parent_inode, _mtime=0):
119 super(File, self).__init__()
121 self.parent_inode = parent_inode
127 def readfrom(self, off, size):
134 class StreamReaderFile(File):
135 '''Wraps a StreamFileReader as a file.'''
137 def __init__(self, parent_inode, reader, _mtime):
138 super(StreamReaderFile, self).__init__(parent_inode, _mtime)
142 return self.reader.size()
144 def readfrom(self, off, size):
145 return self.reader.readfrom(off, size)
151 class StringFile(File):
152 '''Wrap a simple string as a file'''
153 def __init__(self, parent_inode, contents, _mtime):
154 super(StringFile, self).__init__(parent_inode, _mtime)
155 self.contents = contents
158 return len(self.contents)
160 def readfrom(self, off, size):
161 return self.contents[off:(off+size)]
164 class ObjectFile(StringFile):
165 '''Wrap a dict as a serialized json object.'''
167 def __init__(self, parent_inode, obj):
168 super(ObjectFile, self).__init__(parent_inode, "", 0)
169 self.uuid = obj['uuid']
172 def update(self, obj):
173 self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
174 self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
177 class Directory(FreshBase):
178 '''Generic directory object, backed by a dict.
179 Consists of a set of entries with the key representing the filename
180 and the value referencing a File or Directory object.
183 def __init__(self, parent_inode):
184 super(Directory, self).__init__()
186 '''parent_inode is the integer inode number'''
188 if not isinstance(parent_inode, int):
189 raise Exception("parent_inode should be an int")
190 self.parent_inode = parent_inode
192 self._mtime = time.time()
194 # Overriden by subclasses to implement logic to update the entries dict
195 # when the directory is stale
199 # Only used when computing the size of the disk footprint of the directory
204 def checkupdate(self):
208 except apiclient.errors.HttpError as e:
211 def __getitem__(self, item):
213 return self._entries[item]
217 return self._entries.items()
221 return self._entries.iterkeys()
223 def __contains__(self, k):
225 return k in self._entries
227 def merge(self, items, fn, same, new_entry):
228 '''Helper method for updating the contents of the directory. Takes a list
229 describing the new contents of the directory, reuse entries that are
230 the same in both the old and new lists, create new entries, and delete
231 old entries missing from the new list.
233 items: iterable with new directory contents
235 fn: function to take an entry in 'items' and return the desired file or
236 directory name, or None if this entry should be skipped
238 same: function to compare an existing entry (a File or Directory
239 object) with an entry in the items list to determine whether to keep
242 new_entry: function to create a new directory entry (File or Directory
243 object) from an entry in the items list.
247 oldentries = self._entries
251 name = sanitize_filename(fn(i))
253 if name in oldentries and same(oldentries[name], i):
254 # move existing directory entry over
255 self._entries[name] = oldentries[name]
258 # create new directory entry
261 self._entries[name] = self.inodes.add_entry(ent)
264 # delete any other directory entries that were not in found in 'items'
266 llfuse.invalidate_entry(self.inode, str(i))
267 self.inodes.del_entry(oldentries[i])
271 self._mtime = time.time()
276 '''Delete all entries'''
277 oldentries = self._entries
280 if isinstance(n, Directory):
282 llfuse.invalidate_entry(self.inode, str(n))
283 self.inodes.del_entry(oldentries[n])
290 class CollectionDirectory(Directory):
291 '''Represents the root of a directory tree holding a collection.'''
293 def __init__(self, parent_inode, inodes, api, collection):
294 super(CollectionDirectory, self).__init__(parent_inode)
297 self.collection_object_file = None
298 self.collection_object = None
299 if isinstance(collection, dict):
300 self.collection_locator = collection['uuid']
302 self.collection_locator = collection
305 return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
307 def new_collection(self, new_collection_object):
308 self.collection_object = new_collection_object
310 if self.collection_object_file is not None:
311 self.collection_object_file.update(self.collection_object)
314 collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api)
315 for s in collection.all_streams():
317 for part in s.name().split('/'):
318 if part != '' and part != '.':
319 partname = sanitize_filename(part)
320 if partname not in cwd._entries:
321 cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
322 cwd = cwd._entries[partname]
323 for k, v in s.files().items():
324 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
328 if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
331 with llfuse.lock_released:
332 new_collection_object = self.api.collections().get(uuid=self.collection_locator).execute()
333 if "portable_data_hash" not in new_collection_object:
334 new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
335 # end with llfuse.lock_released, re-acquire lock
337 if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
338 self.new_collection(new_collection_object)
342 except apiclient.errors.HttpError as e:
343 if e.resp.status == 404:
344 _logger.warn("arv-mount %s: not found", self.collection_locator)
346 _logger.error("arv-mount %s: error", self.collection_locator)
347 _logger.exception(detail)
348 except Exception as detail:
349 _logger.error("arv-mount %s: error", self.collection_locator)
350 if "manifest_text" in self.collection_object:
351 _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
352 _logger.exception(detail)
355 def __getitem__(self, item):
357 if item == '.arvados#collection':
358 if self.collection_object_file is None:
359 self.collection_object_file = ObjectFile(self.inode, self.collection_object)
360 self.inodes.add_entry(self.collection_object_file)
361 return self.collection_object_file
363 return super(CollectionDirectory, self).__getitem__(item)
365 def __contains__(self, k):
366 if k == '.arvados#collection':
369 return super(CollectionDirectory, self).__contains__(k)
373 return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
376 class MagicDirectory(Directory):
377 '''A special directory that logically contains the set of all extant keep
378 locators. When a file is referenced by lookup(), it is tested to see if it
379 is a valid keep locator to a manifest, and if so, loads the manifest
380 contents as a subdirectory of this directory with the locator as the
381 directory name. Since querying a list of all extant keep locators is
382 impractical, only collections that have already been accessed are visible
386 def __init__(self, parent_inode, inodes, api):
387 super(MagicDirectory, self).__init__(parent_inode)
390 self.readme_file = None
392 def __contains__(self, k):
393 if self.readme_file is None:
394 text = '''This directory provides access to Arvados collections as subdirectories listed
395 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
396 the form '1234567890abcdefghijklmnopqrstuv+123').
398 Note that this directory will appear empty until you attempt to access a
399 specific collection subdirectory (such as trying to 'cd' into it), at which
400 point the collection will actually be looked up on the server and the directory
401 will appear if it exists.
403 self.readme_file = self.inodes.add_entry(StringFile(self.inode, text, time.time()))
404 self._entries["README"] = self.readme_file
406 if k in self._entries:
409 if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
413 e = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, self.api, k))
419 except Exception as e:
420 _logger.debug('arv-mount exception keep %s', e)
423 def __getitem__(self, item):
425 return self._entries[item]
427 raise KeyError("No collection with id " + item)
430 class RecursiveInvalidateDirectory(Directory):
431 def invalidate(self):
432 if self.inode == llfuse.ROOT_INODE:
433 llfuse.lock.acquire()
435 super(RecursiveInvalidateDirectory, self).invalidate()
436 for a in self._entries:
437 self._entries[a].invalidate()
438 except Exception as e:
441 if self.inode == llfuse.ROOT_INODE:
442 llfuse.lock.release()
445 class TagsDirectory(RecursiveInvalidateDirectory):
446 '''A special directory that contains as subdirectories all tags visible to the user.'''
448 def __init__(self, parent_inode, inodes, api, poll_time=60):
449 super(TagsDirectory, self).__init__(parent_inode)
453 self._poll_time = poll_time
456 with llfuse.lock_released:
457 tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
459 self.merge(tags['items'],
460 lambda i: i['name'] if 'name' in i else i['uuid'],
461 lambda a, i: a.tag == i,
462 lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
465 class TagDirectory(Directory):
466 '''A special directory that contains as subdirectories all collections visible
467 to the user that are tagged with a particular tag.
470 def __init__(self, parent_inode, inodes, api, tag, poll=False, poll_time=60):
471 super(TagDirectory, self).__init__(parent_inode)
476 self._poll_time = poll_time
479 with llfuse.lock_released:
480 taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
481 ['name', '=', self.tag],
482 ['head_uuid', 'is_a', 'arvados#collection']],
483 select=['head_uuid']).execute()
484 self.merge(taggedcollections['items'],
485 lambda i: i['head_uuid'],
486 lambda a, i: a.collection_locator == i['head_uuid'],
487 lambda i: CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid']))
490 class ProjectDirectory(RecursiveInvalidateDirectory):
491 '''A special directory that contains the contents of a project.'''
493 def __init__(self, parent_inode, inodes, api, project_object, poll=False, poll_time=60):
494 super(ProjectDirectory, self).__init__(parent_inode)
497 self.project_object = project_object
498 self.project_object_file = ObjectFile(self.inode, self.project_object)
499 self.inodes.add_entry(self.project_object_file)
500 self.uuid = project_object['uuid']
502 def createDirectory(self, i):
503 if collection_uuid_pattern.match(i['uuid']):
504 return CollectionDirectory(self.inode, self.inodes, self.api, i)
505 elif group_uuid_pattern.match(i['uuid']):
506 return ProjectDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
507 elif link_uuid_pattern.match(i['uuid']):
508 if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
509 return CollectionDirectory(self.inode, self.inodes, self.api, i['head_uuid'])
512 elif uuid_pattern.match(i['uuid']):
513 return ObjectFile(self.parent_inode, i)
520 if i['name'] is None or len(i['name']) == 0:
522 elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
523 # collection or subproject
525 elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
528 elif 'kind' in i and i['kind'].startswith('arvados#'):
530 return "{}.{}".format(i['name'], i['kind'][8:])
535 if isinstance(a, CollectionDirectory):
536 return a.collection_locator == i['uuid']
537 elif isinstance(a, ProjectDirectory):
538 return a.uuid == i['uuid']
539 elif isinstance(a, ObjectFile):
540 return a.uuid == i['uuid'] and not a.stale()
543 with llfuse.lock_released:
544 if group_uuid_pattern.match(self.uuid):
545 self.project_object = self.api.groups().get(uuid=self.uuid).execute()
546 elif user_uuid_pattern.match(self.uuid):
547 self.project_object = self.api.users().get(uuid=self.uuid).execute()
549 contents = arvados.util.list_all(self.api.groups().contents, uuid=self.uuid)
550 # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
551 contents += arvados.util.list_all(self.api.links().list, filters=[['tail_uuid', '=', self.uuid], ['link_class', '=', 'name']])
553 # end with llfuse.lock_released, re-acquire lock
558 self.createDirectory)
560 def __getitem__(self, item):
562 if item == '.arvados#project':
563 return self.project_object_file
565 return super(ProjectDirectory, self).__getitem__(item)
567 def __contains__(self, k):
568 if k == '.arvados#project':
571 return super(ProjectDirectory, self).__contains__(k)
574 class SharedDirectory(RecursiveInvalidateDirectory):
575 '''A special directory that represents users or groups who have shared projects with me.'''
577 def __init__(self, parent_inode, inodes, api, exclude, poll=False, poll_time=60):
578 super(SharedDirectory, self).__init__(parent_inode)
579 self.current_user = api.users().current().execute()
583 self._poll_time = poll_time
586 with llfuse.lock_released:
587 all_projects = arvados.util.list_all(self.api.groups().list, filters=[['group_class','=','project']])
589 for ob in all_projects:
590 objects[ob['uuid']] = ob
594 for ob in all_projects:
595 if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
597 root_owners[ob['owner_uuid']] = True
599 lusers = arvados.util.list_all(self.api.users().list, filters=[['uuid','in', list(root_owners)]])
600 lgroups = arvados.util.list_all(self.api.groups().list, filters=[['uuid','in', list(root_owners)]])
606 objects[l["uuid"]] = l
608 objects[l["uuid"]] = l
611 for r in root_owners:
615 contents[obr["name"]] = obr
616 if "first_name" in obr:
617 contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
620 if r['owner_uuid'] not in objects:
621 contents[r['name']] = r
623 # end with llfuse.lock_released, re-acquire lock
626 self.merge(contents.items(),
628 lambda a, i: a.uuid == i[1]['uuid'],
629 lambda i: ProjectDirectory(self.inode, self.inodes, self.api, i[1], poll=self._poll, poll_time=self._poll_time))
630 except Exception as e:
634 class FileHandle(object):
635 '''Connects a numeric file handle to a File or Directory object that has
636 been opened by the client.'''
638 def __init__(self, fh, entry):
643 class Inodes(object):
644 '''Manage the set of inodes. This is the mapping from a numeric id
645 to a concrete File or Directory object'''
649 self._counter = llfuse.ROOT_INODE
651 def __getitem__(self, item):
652 return self._entries[item]
654 def __setitem__(self, key, item):
655 self._entries[key] = item
658 return self._entries.iterkeys()
661 return self._entries.items()
663 def __contains__(self, k):
664 return k in self._entries
666 def add_entry(self, entry):
667 entry.inode = self._counter
668 self._entries[entry.inode] = entry
672 def del_entry(self, entry):
673 llfuse.invalidate_inode(entry.inode)
674 del self._entries[entry.inode]
676 class Operations(llfuse.Operations):
677 '''This is the main interface with llfuse. The methods on this object are
678 called by llfuse threads to service FUSE events to query and read from
681 llfuse has its own global lock which is acquired before calling a request handler,
682 so request handlers do not run concurrently unless the lock is explicitly released
683 using "with llfuse.lock_released:"'''
685 def __init__(self, uid, gid):
686 super(Operations, self).__init__()
688 self.inodes = Inodes()
692 # dict of inode to filehandle
693 self._filehandles = {}
694 self._filehandles_counter = 1
696 # Other threads that need to wait until the fuse driver
697 # is fully initialized should wait() on this event object.
698 self.initlock = threading.Event()
701 # Allow threads that are waiting for the driver to be finished
702 # initializing to continue
705 def access(self, inode, mode, ctx):
708 def getattr(self, inode):
709 if inode not in self.inodes:
710 raise llfuse.FUSEError(errno.ENOENT)
712 e = self.inodes[inode]
714 entry = llfuse.EntryAttributes()
717 entry.entry_timeout = 300
718 entry.attr_timeout = 300
720 entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
721 if isinstance(e, Directory):
722 entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
724 entry.st_mode |= stat.S_IFREG
727 entry.st_uid = self.uid
728 entry.st_gid = self.gid
731 entry.st_size = e.size()
733 entry.st_blksize = 512
734 entry.st_blocks = (e.size()/512)+1
735 entry.st_atime = int(e.atime())
736 entry.st_mtime = int(e.mtime())
737 entry.st_ctime = int(e.mtime())
741 def lookup(self, parent_inode, name):
742 _logger.debug("arv-mount lookup: parent_inode %i name %s",
749 if parent_inode in self.inodes:
750 p = self.inodes[parent_inode]
752 inode = p.parent_inode
754 inode = p[name].inode
757 return self.getattr(inode)
759 raise llfuse.FUSEError(errno.ENOENT)
761 def open(self, inode, flags):
762 if inode in self.inodes:
763 p = self.inodes[inode]
765 raise llfuse.FUSEError(errno.ENOENT)
767 if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
768 raise llfuse.FUSEError(errno.EROFS)
770 if isinstance(p, Directory):
771 raise llfuse.FUSEError(errno.EISDIR)
773 fh = self._filehandles_counter
774 self._filehandles_counter += 1
775 self._filehandles[fh] = FileHandle(fh, p)
778 def read(self, fh, off, size):
779 _logger.debug("arv-mount read %i %i %i", fh, off, size)
780 if fh in self._filehandles:
781 handle = self._filehandles[fh]
783 raise llfuse.FUSEError(errno.EBADF)
786 with llfuse.lock_released:
787 return handle.entry.readfrom(off, size)
789 raise llfuse.FUSEError(errno.EIO)
791 def release(self, fh):
792 if fh in self._filehandles:
793 del self._filehandles[fh]
795 def opendir(self, inode):
796 _logger.debug("arv-mount opendir: inode %i", inode)
798 if inode in self.inodes:
799 p = self.inodes[inode]
801 raise llfuse.FUSEError(errno.ENOENT)
803 if not isinstance(p, Directory):
804 raise llfuse.FUSEError(errno.ENOTDIR)
806 fh = self._filehandles_counter
807 self._filehandles_counter += 1
808 if p.parent_inode in self.inodes:
809 parent = self.inodes[p.parent_inode]
811 raise llfuse.FUSEError(errno.EIO)
813 self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
816 def readdir(self, fh, off):
817 _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
819 if fh in self._filehandles:
820 handle = self._filehandles[fh]
822 raise llfuse.FUSEError(errno.EBADF)
824 _logger.debug("arv-mount handle.entry %s", handle.entry)
827 while e < len(handle.entry):
828 if handle.entry[e][1].inode in self.inodes:
829 yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
832 def releasedir(self, fh):
833 del self._filehandles[fh]
836 st = llfuse.StatvfsData()
837 st.f_bsize = 64 * 1024
850 # The llfuse documentation recommends only overloading functions that
851 # are actually implemented, as the default implementation will raise ENOSYS.
852 # However, there is a bug in the llfuse default implementation of create()
853 # "create() takes exactly 5 positional arguments (6 given)" which will crash
855 # The workaround is to implement it with the proper number of parameters,
856 # and then everything works out.
857 def create(self, p1, p2, p3, p4, p5):
858 raise llfuse.FUSEError(errno.EROFS)