#!/usr/bin/env python # Copyright (C) The Arvados Authors. All rights reserved. # # SPDX-License-Identifier: AGPL-3.0 # arv-web enables you to run a custom web service from the contents of an Arvados collection. # # See http://doc.arvados.org/user/topics/arv-web.html import arvados from arvados.safeapi import ThreadSafeApiCache import subprocess from arvados_fuse import Operations, CollectionDirectory import tempfile import os import llfuse import threading import Queue import argparse import logging import signal import sys import functools logger = logging.getLogger('arvados.arv-web') logger.setLevel(logging.INFO) class ArvWeb(object): def __init__(self, project, docker_image, port): self.project = project self.loop = True self.cid = None self.prev_docker_image = None self.mountdir = None self.collection = None self.override_docker_image = docker_image self.port = port self.evqueue = Queue.Queue() self.api = ThreadSafeApiCache(arvados.config.settings()) if arvados.util.group_uuid_pattern.match(project) is None: raise arvados.errors.ArgumentError("Project uuid is not valid") collections = self.api.collections().list(filters=[["owner_uuid", "=", project]], limit=1, order='modified_at desc').execute()['items'] self.newcollection = collections[0]['uuid'] if collections else None self.ws = arvados.events.subscribe(self.api, [["object_uuid", "is_a", "arvados#collection"]], self.on_message) def check_docker_running(self): # It would be less hacky to use "docker events" than poll "docker ps" # but that would require writing a bigger pile of code. if self.cid: ps = subprocess.check_output(["docker", "ps", "--no-trunc=true", "--filter=status=running"]) for l in ps.splitlines(): if l.startswith(self.cid): return True return False # Handle messages from Arvados event bus. def on_message(self, ev): if 'event_type' in ev: old_attr = None if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']: old_attr = ev['properties']['old_attributes'] if self.project not in (ev['properties']['new_attributes']['owner_uuid'], old_attr['owner_uuid'] if old_attr else None): return et = ev['event_type'] if ev['event_type'] == 'update': if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']: if self.project == ev['properties']['new_attributes']['owner_uuid']: et = 'add' else: et = 'remove' if ev['properties']['new_attributes']['trash_at'] is not None: et = 'remove' self.evqueue.put((self.project, et, ev['object_uuid'])) # Run an arvados_fuse mount under the control of the local process. This lets # us switch out the contents of the directory without having to unmount and # remount. def run_fuse_mount(self): self.mountdir = tempfile.mkdtemp() self.operations = Operations(os.getuid(), os.getgid(), self.api, "utf-8") self.cdir = CollectionDirectory(llfuse.ROOT_INODE, self.operations.inodes, self.api, 2, self.collection) self.operations.inodes.add_entry(self.cdir) # Initialize the fuse connection llfuse.init(self.operations, self.mountdir, ['allow_other']) t = threading.Thread(None, llfuse.main) t.start() # wait until the driver is finished initializing self.operations.initlock.wait() def mount_collection(self): if self.newcollection != self.collection: self.collection = self.newcollection if not self.mountdir and self.collection: self.run_fuse_mount() if self.mountdir: with llfuse.lock: self.cdir.clear() # Switch the FUSE directory object so that it stores # the newly selected collection if self.collection: logger.info("Mounting %s", self.collection) else: logger.info("Mount is empty") self.cdir.change_collection(self.collection) def stop_docker(self): if self.cid: logger.info("Stopping Docker container") subprocess.call(["docker", "stop", self.cid]) self.cid = None def run_docker(self): try: if self.collection is None: self.stop_docker() return docker_image = None if self.override_docker_image: docker_image = self.override_docker_image else: try: with llfuse.lock: if "docker_image" in self.cdir: docker_image = self.cdir["docker_image"].readfrom(0, 1024).strip() except IOError as e: pass has_reload = False try: with llfuse.lock: has_reload = "reload" in self.cdir except IOError as e: pass if docker_image is None: logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.") self.stop_docker() return if docker_image == self.prev_docker_image and self.cid is not None and has_reload: logger.info("Running container reload command") subprocess.check_call(["docker", "exec", self.cid, "/mnt/reload"]) return self.stop_docker() logger.info("Starting Docker container %s", docker_image) self.cid = subprocess.check_output(["docker", "run", "--detach=true", "--publish=%i:80" % (self.port), "--volume=%s:/mnt:ro" % self.mountdir, docker_image]).strip() self.prev_docker_image = docker_image logger.info("Container id %s", self.cid) except subprocess.CalledProcessError: self.cid = None def wait_for_events(self): if not self.cid: logger.warning("No service running! Will wait for a new collection to appear in the project.") else: logger.info("Waiting for events") running = True self.loop = True while running: # Main run loop. Wait on project events, signals, or the # Docker container stopping. try: # Poll the queue with a 1 second timeout, if we have no # timeout the Python runtime doesn't have a chance to # process SIGINT or SIGTERM. eq = self.evqueue.get(True, 1) logger.info("%s %s", eq[1], eq[2]) self.newcollection = self.collection if eq[1] in ('add', 'update', 'create'): self.newcollection = eq[2] elif eq[1] == 'remove': collections = self.api.collections().list(filters=[["owner_uuid", "=", self.project]], limit=1, order='modified_at desc').execute()['items'] self.newcollection = collections[0]['uuid'] if collections else None running = False except Queue.Empty: pass if self.cid and not self.check_docker_running(): logger.warning("Service has terminated. Will try to restart.") self.cid = None running = False def run(self): try: while self.loop: self.loop = False self.mount_collection() try: self.run_docker() self.wait_for_events() except (KeyboardInterrupt): logger.info("Got keyboard interrupt") self.ws.close() self.loop = False except Exception as e: logger.exception("Caught fatal exception, shutting down") self.ws.close() self.loop = False finally: self.stop_docker() if self.mountdir: logger.info("Unmounting") subprocess.call(["fusermount", "-u", self.mountdir]) os.rmdir(self.mountdir) def main(argv): parser = argparse.ArgumentParser() parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch") parser.add_argument('--port', type=int, default=8080, help="Host port to listen on (default 8080)") parser.add_argument('--image', type=str, help="Docker image to run") args = parser.parse_args(argv) signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0)) try: arvweb = ArvWeb(args.project_uuid, args.image, args.port) arvweb.run() except arvados.errors.ArgumentError as e: logger.error(e) return 1 return 0 if __name__ == '__main__': sys.exit(main(sys.argv[1:]))