#!/usr/bin/env python
+# arv-web enables you to run a custom web service from the contents of an Arvados collection.
+#
+# See http://doc.arvados.org/user/topics/arv-web.html
+
import arvados
import subprocess
from arvados_fuse import Operations, SafeApi, CollectionDirectory
import logging
import signal
import sys
+import functools
-logging.basicConfig(level=logging.INFO)
-
-parser = argparse.ArgumentParser()
-parser.add_argument('--project', type=str, required=True, help="Project to watch")
-parser.add_argument('--port', type=int, default=8080, help="Local bind port")
-parser.add_argument('--image', type=str, help="Docker image to run")
-
-args = parser.parse_args()
-
-api = SafeApi(arvados.config)
-project = args.project
-docker_image = args.image
-port = args.port
-evqueue = Queue.Queue()
-
-def run_fuse_mount(collection):
- global api
-
+# Run an arvados_fuse mount under the control of the local process. This lets
+# us switch out the contents of the directory without having to unmount and
+# remount.
+def run_fuse_mount(api, collection):
mountdir = tempfile.mkdtemp()
- operations = Operations(os.getuid(), os.getgid(), "utf-8", True)
- operations.inodes.add_entry(CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, 2, collection))
+ operations = Operations(os.getuid(), os.getgid(), "utf-8")
+ cdir = CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, 2, collection)
+ operations.inodes.add_entry(cdir)
# Initialize the fuse connection
llfuse.init(operations, mountdir, ['allow_other'])
- t = threading.Thread(None, lambda: llfuse.main())
+ t = threading.Thread(None, llfuse.main)
t.start()
# wait until the driver is finished initializing
operations.initlock.wait()
- return mountdir
-
-def on_message(ev):
- global project
- global evqueue
+ return (mountdir, cdir)
+# Handle messages from Arvados event bus.
+def on_message(project, evqueue, ev):
if 'event_type' in ev:
old_attr = None
if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
return
et = ev['event_type']
- if ev['event_type'] == 'update' and ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
- if args.project == ev['properties']['new_attributes']['owner_uuid']:
- et = 'add'
- else:
+ if ev['event_type'] == 'update':
+ if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
+ if args.project_uuid == ev['properties']['new_attributes']['owner_uuid']:
+ et = 'add'
+ else:
+ et = 'remove'
+ if ev['properties']['new_attributes']['expires_at'] is not None:
et = 'remove'
evqueue.put((project, et, ev['object_uuid']))
-collection = api.collections().list(filters=[["owner_uuid", "=", project]],
- limit=1,
- order='modified_at desc').execute()['items'][0]['uuid']
+def main(argv):
+ logger = logging.getLogger('arvados.arv-web')
+ logger.setLevel(logging.INFO)
-ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], on_message)
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch")
+ parser.add_argument('--port', type=int, default=8080, help="Host port to listen on (default 8080)")
+ parser.add_argument('--image', type=str, help="Docker image to run")
-signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
+ args = parser.parse_args(argv)
-loop = True
-cid = None
-while loop:
- loop = False
- logging.info("Mounting %s" % collection)
- mountdir = run_fuse_mount(collection)
+ api = SafeApi(arvados.config)
+ project = args.project_uuid
+ docker_image = args.image
+ port = args.port
+ evqueue = Queue.Queue()
+
+ collections = api.collections().list(filters=[["owner_uuid", "=", project]],
+ limit=1,
+ order='modified_at desc').execute()['items']
+ newcollection = collections[0]['uuid'] if len(collections) > 0 else None
+ collection = None
+
+ ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], functools.partial(on_message, project, evqueue))
+
+ signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
+
+ loop = True
+ cid = None
+ docker_proc = None
+ prev_docker_image = None
+ mountdir = None
try:
- if not args.image:
- if os.path.exists(os.path.join(mountdir, "docker_image")):
- with open(os.path.join(mountdir, "docker_image")) as di:
- docker_image = di.read().strip()
- else:
- logging.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
- sys.exit(1)
-
- logging.info("Starting docker container %s" % docker_image)
- cid = subprocess.check_output(["docker", "run",
- "--detach=true",
- "--publish=%i:80" % (port),
- "--volume=%s:/mnt:ro" % mountdir,
- docker_image])
- cid = cid.rstrip()
- logging.info("Container id is %s" % cid)
-
- logging.info("Waiting for events")
- running = True
- loop = True
- while running:
+ while loop:
+ loop = False
+ if newcollection != collection:
+ collection = newcollection
+ if not mountdir:
+ (mountdir, cdir) = run_fuse_mount(api, collection)
+
+ with llfuse.lock:
+ cdir.clear()
+ if collection:
+ # Switch the FUSE directory object so that it stores
+ # the newly selected collection
+ logger.info("Mounting %s", collection)
+ cdir.collection_locator = collection
+ cdir.collection_object = None
+ cdir.update()
+
try:
- eq = evqueue.get(True, 1)
- logging.info("%s %s" % (eq[1], eq[2]))
- newcollection = collection
- if eq[1] in ('add', 'update', 'create'):
- newcollection = eq[2]
- elif eq[1] == 'remove':
- newcollection = api.collections().list(filters=[["owner_uuid", "=", project]],
- limit=1,
- order='modified_at desc').execute()['items'][0]['uuid']
- if newcollection != collection:
- logging.info("restarting web service")
- collection = newcollection
- running = False
- except Queue.Empty:
- pass
- except (KeyboardInterrupt):
- logging.info("Got keyboard interrupt")
- ws.close()
- loop = False
- except Exception as e:
- logging.exception(str(e))
- ws.close()
- loop = False
+ try:
+ if collection:
+ if not args.image:
+ docker_image = None
+
+ # FUSE is asynchronous, so there is a race between
+ # the directory being updated above and the kernel
+ # cache being refreshed. This manifests as the
+ # bizare behavior where os.path.exists() returns
+ # True, but open() raises "file not found". The
+ # workaround is to keep trying until the kernel
+ # catches up.
+ while not docker_image and os.path.exists(os.path.join(mountdir, "docker_image")):
+ try:
+ with open(os.path.join(mountdir, "docker_image")) as di:
+ docker_image = di.read().strip()
+ except IOError as e:
+ pass
+
+ if not docker_image:
+ logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
+
+ if docker_image and ((docker_image != prev_docker_image) or cid is None):
+ if cid:
+ logger.info("Stopping Docker container")
+ subprocess.check_call(["docker", "stop", cid])
+ cid = None
+ docker_proc = None
+
+ if docker_image:
+ logger.info("Starting Docker container %s", docker_image)
+ ciddir = tempfile.mkdtemp()
+ cidfilepath = os.path.join(ciddir, "cidfile")
+ docker_proc = subprocess.Popen(["docker", "run",
+ "--cidfile=%s" % (cidfilepath),
+ "--publish=%i:80" % (port),
+ "--volume=%s:/mnt:ro" % mountdir,
+ docker_image])
+ cid = None
+ while not cid and docker_proc.poll() is None:
+ try:
+ with open(cidfilepath) as cidfile:
+ cid = cidfile.read().strip()
+ except IOError:
+ pass
+ try:
+ os.unlink(cidfilepath)
+ os.rmdir(ciddir)
+ except OSError:
+ pass
+
+ prev_docker_image = docker_image
+ logger.info("Container id %s", cid)
+ elif cid:
+ logger.info("Sending refresh signal to container")
+ # Send SIGHUP to all the processes inside the
+ # container. By convention, services are expected
+ # to reload their configuration. If they die
+ # instead, that's okay, because then we'll just
+ # start a new container.
+ #
+ # Getting the services inside the container to
+ # refresh turned out to be really hard. Here are
+ # some of the other things I tried:
+ #
+ # docker kill --signal=HUP # no effect
+ # docker_proc.send_signal(signal.SIGHUP) # no effect
+ # os.killpg(os.getpgid(docker_proc.pid), signal.SIGHUP) # docker-proxy dies as collatoral damage
+ # docker exec apache2ctl restart # only works if service is using apache.
+ # Sending HUP directly to the processes inside the container: permission denied
+
+ subprocess.check_call(["docker", "exec", cid, "killall", "--regexp", ".*", "--signal", "HUP"])
+ elif cid:
+ logger.info("Stopping docker container")
+ subprocess.check_call(["docker", "stop", cid])
+ except subprocess.CalledProcessError:
+ cid = None
+
+ if not cid:
+ logger.warning("No service running! Will wait for a new collection to appear in the project.")
+ else:
+ logger.info("Waiting for events")
+
+ running = True
+ loop = True
+ while running:
+ # Main run loop. Wait on project events, signals, or the
+ # Docker container stopping.
+
+ try:
+ # Poll the queue with a 1 second timeout, if we have no
+ # timeout the Python runtime doesn't have a chance to
+ # process SIGINT or SIGTERM.
+ eq = evqueue.get(True, 1)
+ logger.info("%s %s", eq[1], eq[2])
+ newcollection = collection
+ if eq[1] in ('add', 'update', 'create'):
+ newcollection = eq[2]
+ elif eq[1] == 'remove':
+ collections = api.collections().list(filters=[["owner_uuid", "=", project]],
+ limit=1,
+ order='modified_at desc').execute()['items']
+ newcollection = collections[0]['uuid'] if len(collections) > 0 else None
+ running = False
+ except Queue.Empty:
+ pass
+
+ if docker_proc and docker_proc.poll() is not None:
+ logger.warning("Service has terminated. Will try to restart.")
+ cid = None
+ docker_proc = None
+ running = False
+
+ except (KeyboardInterrupt):
+ logger.info("Got keyboard interrupt")
+ ws.close()
+ loop = False
+ except Exception as e:
+ logger.exception(e)
+ ws.close()
+ loop = False
finally:
if cid:
- logging.info("Stopping docker container")
- cid = subprocess.call(["docker", "stop", cid])
+ logger.info("Stopping docker container")
+ subprocess.check_call(["docker", "stop", cid])
+
+ if mountdir:
+ logger.info("Unmounting")
+ subprocess.call(["fusermount", "-u", "-z", mountdir])
+ os.rmdir(mountdir)
- logging.info("Unmounting")
- subprocess.call(["fusermount", "-u", "-z", mountdir])
- os.rmdir(mountdir)
+if __name__ == '__main__':
+ main(sys.argv[1:])