4904: Fix documentation issues. Add comments to arv-web.py.
[arvados.git] / services / arv-web / arv-web.py
index 9b4ccdf9113f37a0ce593bdbf19afef139bcfef3..d3f7b84b7af6891f301f44277f5fe4ee38b8c344 100755 (executable)
@@ -1,5 +1,9 @@
 #!/usr/bin/env python
 
+# arv-web enables you to run a custom web service from the contents of an Arvados collection.
+#
+# See http://doc.arvados.org/user/topics/arv-web.html
+
 import arvados
 import subprocess
 from arvados_fuse import Operations, SafeApi, CollectionDirectory
@@ -12,45 +16,31 @@ import argparse
 import logging
 import signal
 import sys
+import functools
 
-logging.basicConfig(level=logging.INFO)
-
-parser = argparse.ArgumentParser()
-parser.add_argument('--project', type=str, required=True, help="Project to watch")
-parser.add_argument('--port', type=int, default=8080, help="Local bind port")
-parser.add_argument('--image', type=str, help="Docker image to run")
-
-args = parser.parse_args()
-
-api = SafeApi(arvados.config)
-project = args.project
-docker_image = args.image
-port = args.port
-evqueue = Queue.Queue()
-
-def run_fuse_mount(collection):
-    global api
-
+# Run an arvados_fuse mount under the control of the local process.  This lets
+# us switch out the contents of the directory without having to unmount and
+# remount.
+def run_fuse_mount(api, collection):
     mountdir = tempfile.mkdtemp()
 
-    operations = Operations(os.getuid(), os.getgid(), "utf-8", True)
-    operations.inodes.add_entry(CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, 2, collection))
+    operations = Operations(os.getuid(), os.getgid(), "utf-8")
+    cdir = CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, 2, collection)
+    operations.inodes.add_entry(cdir)
 
     # Initialize the fuse connection
     llfuse.init(operations, mountdir, ['allow_other'])
 
-    t = threading.Thread(None, lambda: llfuse.main())
+    t = threading.Thread(None, llfuse.main)
     t.start()
 
     # wait until the driver is finished initializing
     operations.initlock.wait()
 
-    return mountdir
-
-def on_message(ev):
-    global project
-    global evqueue
+    return (mountdir, cdir)
 
+# Handle messages from Arvados event bus.
+def on_message(project, evqueue, ev):
     if 'event_type' in ev:
         old_attr = None
         if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
@@ -60,80 +50,199 @@ def on_message(ev):
             return
 
         et = ev['event_type']
-        if ev['event_type'] == 'update' and ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
-            if args.project == ev['properties']['new_attributes']['owner_uuid']:
-                et = 'add'
-            else:
+        if ev['event_type'] == 'update':
+            if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
+                if args.project_uuid == ev['properties']['new_attributes']['owner_uuid']:
+                    et = 'add'
+                else:
+                    et = 'remove'
+            if ev['properties']['new_attributes']['expires_at'] is not None:
                 et = 'remove'
 
         evqueue.put((project, et, ev['object_uuid']))
 
-collection = api.collections().list(filters=[["owner_uuid", "=", project]],
-                    limit=1,
-                    order='modified_at desc').execute()['items'][0]['uuid']
+def main(argv):
+    logger = logging.getLogger('arvados.arv-web')
+    logger.setLevel(logging.INFO)
 
-ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], on_message)
+    parser = argparse.ArgumentParser()
+    parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch")
+    parser.add_argument('--port', type=int, default=8080, help="Host port to listen on (default 8080)")
+    parser.add_argument('--image', type=str, help="Docker image to run")
 
-signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
+    args = parser.parse_args(argv)
 
-loop = True
-cid = None
-while loop:
-    loop = False
-    logging.info("Mounting %s" % collection)
-    mountdir = run_fuse_mount(collection)
+    api = SafeApi(arvados.config)
+    project = args.project_uuid
+    docker_image = args.image
+    port = args.port
+    evqueue = Queue.Queue()
+
+    collections = api.collections().list(filters=[["owner_uuid", "=", project]],
+                        limit=1,
+                        order='modified_at desc').execute()['items']
+    newcollection = collections[0]['uuid'] if len(collections) > 0 else None
+    collection = None
+
+    ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], functools.partial(on_message, project, evqueue))
+
+    signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
+
+    loop = True
+    cid = None
+    docker_proc = None
+    prev_docker_image = None
+    mountdir = None
 
     try:
-        if not args.image:
-            if os.path.exists(os.path.join(mountdir, "docker_image")):
-                with open(os.path.join(mountdir, "docker_image")) as di:
-                    docker_image = di.read().strip()
-            else:
-                logging.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
-                sys.exit(1)
-
-        logging.info("Starting docker container")
-        cid = subprocess.check_output(["docker", "run",
-                                       "--detach=true",
-                                       "--publish=%i:80" % (port),
-                                       "--volume=%s:/mnt:ro" % mountdir,
-                                       docker_image])
-        cid = cid.rstrip()
-        logging.info("Container id is %s" % cid)
-
-        logging.info("Waiting for events")
-        running = True
-        loop = True
-        while running:
+        while loop:
+            loop = False
+            if newcollection != collection:
+                collection = newcollection
+                if not mountdir:
+                    (mountdir, cdir) = run_fuse_mount(api, collection)
+
+                with llfuse.lock:
+                    cdir.clear()
+                    if collection:
+                        # Switch the FUSE directory object so that it stores
+                        # the newly selected collection
+                        logger.info("Mounting %s", collection)
+                        cdir.collection_locator = collection
+                        cdir.collection_object = None
+                        cdir.update()
+
             try:
-                eq = evqueue.get(True, 1)
-                logging.info("%s %s" % (eq[1], eq[2]))
-                newcollection = collection
-                if eq[1] in ('add', 'update', 'create'):
-                    newcollection = eq[2]
-                elif eq[1] == 'remove':
-                    newcollection = api.collections().list(filters=[["owner_uuid", "=", project]],
-                                                        limit=1,
-                                                        order='modified_at desc').execute()['items'][0]['uuid']
-                if newcollection != collection:
-                    logging.info("restarting web service")
-                    collection = newcollection
-                    running = False
-            except Queue.Empty:
-                pass
-    except (KeyboardInterrupt):
-        logging.info("Got keyboard interrupt")
-        ws.close()
-        loop = False
-    except Exception as e:
-        logging.exception(str(e))
-        ws.close()
-        loop = False
+                try:
+                    if collection:
+                        if not args.image:
+                            docker_image = None
+
+                            # FUSE is asynchronous, so there is a race between
+                            # the directory being updated above and the kernel
+                            # cache being refreshed.  This manifests as the
+                            # bizare behavior where os.path.exists() returns
+                            # True, but open() raises "file not found".  The
+                            # workaround is to keep trying until the kernel
+                            # catches up.
+                            while not docker_image and os.path.exists(os.path.join(mountdir, "docker_image")):
+                                try:
+                                    with open(os.path.join(mountdir, "docker_image")) as di:
+                                        docker_image = di.read().strip()
+                                except IOError as e:
+                                    pass
+
+                        if not docker_image:
+                            logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
+
+                        if docker_image and ((docker_image != prev_docker_image) or cid is None):
+                            if cid:
+                                logger.info("Stopping Docker container")
+                                subprocess.check_call(["docker", "stop", cid])
+                                cid = None
+                                docker_proc = None
+
+                            if docker_image:
+                                logger.info("Starting Docker container %s", docker_image)
+                                ciddir = tempfile.mkdtemp()
+                                cidfilepath = os.path.join(ciddir, "cidfile")
+                                docker_proc = subprocess.Popen(["docker", "run",
+                                                                "--cidfile=%s" % (cidfilepath),
+                                                                "--publish=%i:80" % (port),
+                                                                "--volume=%s:/mnt:ro" % mountdir,
+                                                                docker_image])
+                                cid = None
+                                while not cid and docker_proc.poll() is None:
+                                    try:
+                                        with open(cidfilepath) as cidfile:
+                                            cid = cidfile.read().strip()
+                                    except IOError:
+                                        pass
+                                try:
+                                    os.unlink(cidfilepath)
+                                    os.rmdir(ciddir)
+                                except OSError:
+                                    pass
+
+                                prev_docker_image = docker_image
+                                logger.info("Container id %s", cid)
+                        elif cid:
+                            logger.info("Sending refresh signal to container")
+                            # Send SIGHUP to all the processes inside the
+                            # container.  By convention, services are expected
+                            # to reload their configuration.  If they die
+                            # instead, that's okay, because then we'll just
+                            # start a new container.
+                            #
+                            # Getting the services inside the container to
+                            # refresh turned out to be really hard.  Here are
+                            # some of the other things I tried:
+                            #
+                            # docker kill --signal=HUP               # no effect
+                            # docker_proc.send_signal(signal.SIGHUP) # no effect
+                            # os.killpg(os.getpgid(docker_proc.pid), signal.SIGHUP) # docker-proxy dies as collatoral damage
+                            # docker exec apache2ctl restart         # only works if service is using apache.
+                            # Sending HUP directly to the processes inside the container: permission denied
+
+                            subprocess.check_call(["docker", "exec", cid, "killall", "--regexp", ".*", "--signal", "HUP"])
+                    elif cid:
+                        logger.info("Stopping docker container")
+                        subprocess.check_call(["docker", "stop", cid])
+                except subprocess.CalledProcessError:
+                    cid = None
+
+                if not cid:
+                    logger.warning("No service running!  Will wait for a new collection to appear in the project.")
+                else:
+                    logger.info("Waiting for events")
+
+                running = True
+                loop = True
+                while running:
+                    # Main run loop.  Wait on project events, signals, or the
+                    # Docker container stopping.
+
+                    try:
+                        # Poll the queue with a 1 second timeout, if we have no
+                        # timeout the Python runtime doesn't have a chance to
+                        # process SIGINT or SIGTERM.
+                        eq = evqueue.get(True, 1)
+                        logger.info("%s %s", eq[1], eq[2])
+                        newcollection = collection
+                        if eq[1] in ('add', 'update', 'create'):
+                            newcollection = eq[2]
+                        elif eq[1] == 'remove':
+                            collections = api.collections().list(filters=[["owner_uuid", "=", project]],
+                                                                limit=1,
+                                                                order='modified_at desc').execute()['items']
+                            newcollection = collections[0]['uuid'] if len(collections) > 0 else None
+                        running = False
+                    except Queue.Empty:
+                        pass
+
+                    if docker_proc and docker_proc.poll() is not None:
+                        logger.warning("Service has terminated.  Will try to restart.")
+                        cid = None
+                        docker_proc = None
+                        running = False
+
+            except (KeyboardInterrupt):
+                logger.info("Got keyboard interrupt")
+                ws.close()
+                loop = False
+            except Exception as e:
+                logger.exception(e)
+                ws.close()
+                loop = False
     finally:
         if cid:
-            logging.info("Stopping docker container")
-            cid = subprocess.call(["docker", "stop", cid])
+            logger.info("Stopping docker container")
+            subprocess.check_call(["docker", "stop", cid])
+
+        if mountdir:
+            logger.info("Unmounting")
+            subprocess.call(["fusermount", "-u", "-z", mountdir])
+            os.rmdir(mountdir)
 
-        logging.info("Unmounting")
-        subprocess.call(["fusermount", "-u", "-z", mountdir])
-        os.rmdir(mountdir)
+if __name__ == '__main__':
+    main(sys.argv[1:])