4904: Fixed event listening. Terminates properly on signals. Tested and works now.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 14 Jan 2015 21:14:05 +0000 (16:14 -0500)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 14 Jan 2015 21:14:05 +0000 (16:14 -0500)
runit.py

index 80111588bea92058895a1c7cced847b61ce76c87..9ab9859a4c4cb0429c19cc72ffacdf90824c1296 100644 (file)
--- a/runit.py
+++ b/runit.py
@@ -7,9 +7,15 @@ import llfuse
 import threading
 import Queue
 import argparse
+import logging
+import signal
+import sys
+
+logging.basicConfig(level=logging.INFO)
 
 parser = argparse.ArgumentParser()
 parser.add_argument('--project', type=str, required=True, help="Project to watch")
+parser.add_argument('--port', type=int, default=8080, help="Local bind port")
 parser.add_argument('--image', type=str, required=True, help="Docker image to run")
 
 args = parser.parse_args()
@@ -17,6 +23,7 @@ args = parser.parse_args()
 api = SafeApi(arvados.config)
 project = args.project
 docker_image = args.image
+port = args.port
 evqueue = Queue.Queue()
 
 def run_fuse_mount(collection):
@@ -42,9 +49,6 @@ def on_message(ev):
     global project
     global evqueue
 
-    import pprint
-    pprint.pprint(ev)
-
     if 'event_type' in ev:
         old_attr = None
         if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
@@ -62,30 +66,53 @@ def on_message(ev):
 
         evqueue.put((project, et, ev['object_uuid']))
 
-filters = [['owner_uuid', '=', project],
-           ['uuid', 'is_a', 'arvados#collection']]
-
-collection = api.collections().list(filters=filters,
+collection = api.collections().list(filters=[["owner_uuid", "=", project]],
                     limit=1,
                     order='modified_at desc').execute()['items'][0]['uuid']
 
-ws = arvados.events.subscribe(api, filters, on_message)
+ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], on_message)
+
+signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
 
-while True:
+loop = True
+cid = None
+while loop:
+    logging.info("Mounting %s" % collection)
     mountdir = run_fuse_mount(collection)
     try:
+        logging.info("Starting docker container")
         cid = subprocess.check_output(["docker", "run",
                                        "--detach=true",
+                                       "--publish=%i:80" % (port),
                                        "--volume=%s:/mnt:ro" % mountdir,
                                        docker_image])
+
+        logging.info("Waiting for events")
         running = True
         while running:
-            eq = evqueue.get()
-            if eq[1] == 'add' or eq[1] == 'update':
-                collection = eq[2]
-                running = False
-
-        cid = subprocess.call(["docker", "stop", cid.rstrip()])
+            try:
+                eq = evqueue.get(True, 1)
+                logging.info("%s %s, restarting web service" % (eq[1], eq[2]))
+                if eq[1] == 'add' or eq[1] == 'update':
+                    collection = eq[2]
+                    running = False
+                if eq[1] == 'remove':
+                    collection = api.collections().list(filters=[["owner_uuid", "=", project]],
+                                                        limit=1,
+                                                        order='modified_at desc').execute()['items'][0]['uuid']
+                    running = False
+
+            except Queue.Empty:
+                pass
+    except KeyboardInterrupt:
+        logging.info("Got keyboard interrupt")
+        ws.close()
+        loop = False
     finally:
+        if cid:
+            logging.info("Stopping docker container")
+            cid = subprocess.call(["docker", "stop", cid.rstrip()])
+
+        logging.info("Unmounting")
         subprocess.call(["fusermount", "-u", "-z", mountdir])
         os.rmdir(mountdir)