X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1060cff7480331bcad2b4564e270922b2b3b1f94..c9ff74363edc2f3271c117184be2d1ad7fad633a:/services/arv-web/arv-web.py diff --git a/services/arv-web/arv-web.py b/services/arv-web/arv-web.py index c04b29eddc..7dbd304adc 100755 --- a/services/arv-web/arv-web.py +++ b/services/arv-web/arv-web.py @@ -1,5 +1,9 @@ #!/usr/bin/env python +# arv-web enables you to run a custom web service from the contents of an Arvados collection. +# +# See http://doc.arvados.org/user/topics/arv-web.html + import arvados import subprocess from arvados_fuse import Operations, SafeApi, CollectionDirectory @@ -12,128 +16,236 @@ import argparse import logging import signal import sys +import functools + +logger = logging.getLogger('arvados.arv-web') +logger.setLevel(logging.INFO) + +class ArvWeb(object): + def __init__(self, project, docker_image, port): + self.project = project + self.loop = True + self.cid = None + self.prev_docker_image = None + self.mountdir = None + self.collection = None + self.override_docker_image = docker_image + self.port = port + self.evqueue = Queue.Queue() + self.api = SafeApi(arvados.config) + + if arvados.util.group_uuid_pattern.match(project) is None: + raise arvados.errors.ArgumentError("Project uuid is not valid") + + collections = self.api.collections().list(filters=[["owner_uuid", "=", project]], + limit=1, + order='modified_at desc').execute()['items'] + self.newcollection = collections[0]['uuid'] if collections else None + + self.ws = arvados.events.subscribe(self.api, [["object_uuid", "is_a", "arvados#collection"]], self.on_message) + + def check_docker_running(self): + # It would be less hacky to use "docker events" than poll "docker ps" + # but that would require writing a bigger pile of code. + if self.cid: + ps = subprocess.check_output(["docker", "ps", "--no-trunc=true", "--filter=status=running"]) + for l in ps.splitlines(): + if l.startswith(self.cid): + return True + return False + + # Handle messages from Arvados event bus. + def on_message(self, ev): + if 'event_type' in ev: + old_attr = None + if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']: + old_attr = ev['properties']['old_attributes'] + if self.project not in (ev['properties']['new_attributes']['owner_uuid'], + old_attr['owner_uuid'] if old_attr else None): + return + + et = ev['event_type'] + if ev['event_type'] == 'update': + if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']: + if self.project == ev['properties']['new_attributes']['owner_uuid']: + et = 'add' + else: + et = 'remove' + if ev['properties']['new_attributes']['expires_at'] is not None: + et = 'remove' + + self.evqueue.put((self.project, et, ev['object_uuid'])) + + # Run an arvados_fuse mount under the control of the local process. This lets + # us switch out the contents of the directory without having to unmount and + # remount. + def run_fuse_mount(self): + self.mountdir = tempfile.mkdtemp() + + self.operations = Operations(os.getuid(), os.getgid(), "utf-8") + self.cdir = CollectionDirectory(llfuse.ROOT_INODE, self.operations.inodes, self.api, 2, self.collection) + self.operations.inodes.add_entry(self.cdir) + + # Initialize the fuse connection + llfuse.init(self.operations, self.mountdir, ['allow_other']) + + t = threading.Thread(None, llfuse.main) + t.start() + + # wait until the driver is finished initializing + self.operations.initlock.wait() + + def mount_collection(self): + if self.newcollection != self.collection: + self.collection = self.newcollection + if not self.mountdir and self.collection: + self.run_fuse_mount() + + if self.mountdir: + with llfuse.lock: + self.cdir.clear() + # Switch the FUSE directory object so that it stores + # the newly selected collection + if self.collection: + logger.info("Mounting %s", self.collection) + else: + logger.info("Mount is empty") + self.cdir.change_collection(self.collection) + + + def stop_docker(self): + if self.cid: + logger.info("Stopping Docker container") + subprocess.call(["docker", "stop", self.cid]) + self.cid = None + + def run_docker(self): + try: + if self.collection is None: + self.stop_docker() + return + + docker_image = None + if self.override_docker_image: + docker_image = self.override_docker_image + else: + try: + with llfuse.lock: + if "docker_image" in self.cdir: + docker_image = self.cdir["docker_image"].readfrom(0, 1024).strip() + except IOError as e: + pass + + has_reload = False + try: + with llfuse.lock: + has_reload = "reload" in self.cdir + except IOError as e: + pass -logging.basicConfig(level=logging.INFO) - -parser = argparse.ArgumentParser() -parser.add_argument('--project', type=str, required=True, help="Project to watch") -parser.add_argument('--port', type=int, default=8080, help="Local bind port") -parser.add_argument('--image', type=str, help="Docker image to run") - -args = parser.parse_args() - -api = SafeApi(arvados.config) -project = args.project -docker_image = args.image -port = args.port -evqueue = Queue.Queue() - -def run_fuse_mount(collection): - global api - - mountdir = tempfile.mkdtemp() - - operations = Operations(os.getuid(), os.getgid(), "utf-8", True) - operations.inodes.add_entry(CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, 2, collection)) - - # Initialize the fuse connection - llfuse.init(operations, mountdir, ['allow_other']) - - t = threading.Thread(None, lambda: llfuse.main()) - t.start() - - # wait until the driver is finished initializing - operations.initlock.wait() - - return mountdir - -def on_message(ev): - global project - global evqueue - - if 'event_type' in ev: - old_attr = None - if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']: - old_attr = ev['properties']['old_attributes'] - if project not in (ev['properties']['new_attributes']['owner_uuid'], - old_attr['owner_uuid'] if old_attr else None): - return + if docker_image is None: + logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.") + self.stop_docker() + return - et = ev['event_type'] - if ev['event_type'] == 'update' and ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']: - if args.project == ev['properties']['new_attributes']['owner_uuid']: - et = 'add' - else: - et = 'remove' + if docker_image == self.prev_docker_image and self.cid is not None and has_reload: + logger.info("Running container reload command") + subprocess.check_call(["docker", "exec", self.cid, "/mnt/reload"]) + return - evqueue.put((project, et, ev['object_uuid'])) + self.stop_docker() -collection = api.collections().list(filters=[["owner_uuid", "=", project]], - limit=1, - order='modified_at desc').execute()['items'][0]['uuid'] + logger.info("Starting Docker container %s", docker_image) + ciddir = tempfile.mkdtemp() + cidfilepath = os.path.join(ciddir, "cidfile") + self.cid = subprocess.check_output(["docker", "run", + "--detach=true", + "--publish=%i:80" % (self.port), + "--volume=%s:/mnt:ro" % self.mountdir, + docker_image]).strip() -ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], on_message) + self.prev_docker_image = docker_image + logger.info("Container id %s", self.cid) -signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0)) + except subprocess.CalledProcessError: + self.cid = None -loop = True -cid = None -while loop: - loop = False - logging.info("Mounting %s" % collection) - mountdir = run_fuse_mount(collection) + def wait_for_events(self): + if not self.cid: + logger.warning("No service running! Will wait for a new collection to appear in the project.") + else: + logger.info("Waiting for events") - try: - if not args.image: - if os.path.exists(os.path.join(mountdir, "docker_image")): - with open(os.path.join(mountdir, "docker_image")) as di: - docker_image = di.read().strip() - else: - logging.error("Collection must contain a file 'docker_image' or must specify --image on the command line.") - sys.exit(1) - - logging.info("Starting docker container %s" % docker_image) - cid = subprocess.check_output(["docker", "run", - "--detach=true", - "--publish=%i:80" % (port), - "--volume=%s:/mnt:ro" % mountdir, - docker_image]) - cid = cid.rstrip() - logging.info("Container id is %s" % cid) - - logging.info("Waiting for events") running = True - loop = True + self.loop = True while running: + # Main run loop. Wait on project events, signals, or the + # Docker container stopping. + try: - eq = evqueue.get(True, 1) - logging.info("%s %s" % (eq[1], eq[2])) - newcollection = collection + # Poll the queue with a 1 second timeout, if we have no + # timeout the Python runtime doesn't have a chance to + # process SIGINT or SIGTERM. + eq = self.evqueue.get(True, 1) + logger.info("%s %s", eq[1], eq[2]) + self.newcollection = self.collection if eq[1] in ('add', 'update', 'create'): - newcollection = eq[2] + self.newcollection = eq[2] elif eq[1] == 'remove': - newcollection = api.collections().list(filters=[["owner_uuid", "=", project]], + collections = self.api.collections().list(filters=[["owner_uuid", "=", self.project]], limit=1, - order='modified_at desc').execute()['items'][0]['uuid'] - if newcollection != collection: - logging.info("restarting web service") - collection = newcollection - running = False + order='modified_at desc').execute()['items'] + self.newcollection = collections[0]['uuid'] if collections else None + running = False except Queue.Empty: pass - except (KeyboardInterrupt): - logging.info("Got keyboard interrupt") - ws.close() - loop = False - except Exception as e: - logging.exception(str(e)) - ws.close() - loop = False - finally: - if cid: - logging.info("Stopping docker container") - cid = subprocess.call(["docker", "stop", cid]) - - logging.info("Unmounting") - subprocess.call(["fusermount", "-u", "-z", mountdir]) - os.rmdir(mountdir) + + if self.cid and not self.check_docker_running(): + logger.warning("Service has terminated. Will try to restart.") + self.cid = None + running = False + + + def run(self): + try: + while self.loop: + self.loop = False + self.mount_collection() + try: + self.run_docker() + self.wait_for_events() + except (KeyboardInterrupt): + logger.info("Got keyboard interrupt") + self.ws.close() + self.loop = False + except Exception as e: + logger.exception("Caught fatal exception, shutting down") + self.ws.close() + self.loop = False + finally: + self.stop_docker() + + if self.mountdir: + logger.info("Unmounting") + subprocess.call(["fusermount", "-u", self.mountdir]) + os.rmdir(self.mountdir) + + +def main(argv): + parser = argparse.ArgumentParser() + parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch") + parser.add_argument('--port', type=int, default=8080, help="Host port to listen on (default 8080)") + parser.add_argument('--image', type=str, help="Docker image to run") + + args = parser.parse_args(argv) + + signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0)) + + try: + arvweb = ArvWeb(args.project_uuid, args.image, args.port) + arvweb.run() + except arvados.errors.ArgumentError as e: + logger.error(e) + +if __name__ == '__main__': + main(sys.argv[1:])