import threading
import Queue
import argparse
+import logging
+import signal
+import sys
+
+logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument('--project', type=str, required=True, help="Project to watch")
+parser.add_argument('--port', type=int, default=8080, help="Local bind port")
parser.add_argument('--image', type=str, required=True, help="Docker image to run")
args = parser.parse_args()
api = SafeApi(arvados.config)
project = args.project
docker_image = args.image
+port = args.port
evqueue = Queue.Queue()
def run_fuse_mount(collection):
global project
global evqueue
- import pprint
- pprint.pprint(ev)
-
if 'event_type' in ev:
old_attr = None
if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
evqueue.put((project, et, ev['object_uuid']))
-filters = [['owner_uuid', '=', project],
- ['uuid', 'is_a', 'arvados#collection']]
-
-collection = api.collections().list(filters=filters,
+collection = api.collections().list(filters=[["owner_uuid", "=", project]],
limit=1,
order='modified_at desc').execute()['items'][0]['uuid']
-ws = arvados.events.subscribe(api, filters, on_message)
+ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], on_message)
+
+signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
-while True:
+loop = True
+cid = None
+while loop:
+ logging.info("Mounting %s" % collection)
mountdir = run_fuse_mount(collection)
try:
+ logging.info("Starting docker container")
cid = subprocess.check_output(["docker", "run",
"--detach=true",
+ "--publish=%i:80" % (port),
"--volume=%s:/mnt:ro" % mountdir,
docker_image])
+
+ logging.info("Waiting for events")
running = True
while running:
- eq = evqueue.get()
- if eq[1] == 'add' or eq[1] == 'update':
- collection = eq[2]
- running = False
-
- cid = subprocess.call(["docker", "stop", cid.rstrip()])
+ try:
+ eq = evqueue.get(True, 1)
+ logging.info("%s %s, restarting web service" % (eq[1], eq[2]))
+ if eq[1] == 'add' or eq[1] == 'update':
+ collection = eq[2]
+ running = False
+ if eq[1] == 'remove':
+ collection = api.collections().list(filters=[["owner_uuid", "=", project]],
+ limit=1,
+ order='modified_at desc').execute()['items'][0]['uuid']
+ running = False
+
+ except Queue.Empty:
+ pass
+ except KeyboardInterrupt:
+ logging.info("Got keyboard interrupt")
+ ws.close()
+ loop = False
finally:
+ if cid:
+ logging.info("Stopping docker container")
+ cid = subprocess.call(["docker", "stop", cid.rstrip()])
+
+ logging.info("Unmounting")
subprocess.call(["fusermount", "-u", "-z", mountdir])
os.rmdir(mountdir)