+++ /dev/null
-import arvados
-import subprocess
-from arvados_fuse import Operations, SafeApi, CollectionDirectory
-import tempfile
-import os
-import llfuse
-import threading
-import Queue
-import argparse
-import logging
-import signal
-import sys
-
-logging.basicConfig(level=logging.INFO)
-
-parser = argparse.ArgumentParser()
-parser.add_argument('--project', type=str, required=True, help="Project to watch")
-parser.add_argument('--port', type=int, default=8080, help="Local bind port")
-parser.add_argument('--image', type=str, required=True, help="Docker image to run")
-
-args = parser.parse_args()
-
-api = SafeApi(arvados.config)
-project = args.project
-docker_image = args.image
-port = args.port
-evqueue = Queue.Queue()
-
-def run_fuse_mount(collection):
- global api
-
- mountdir = tempfile.mkdtemp()
-
- operations = Operations(os.getuid(), os.getgid(), "utf-8", True)
- operations.inodes.add_entry(CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, 2, collection))
-
- # Initialize the fuse connection
- llfuse.init(operations, mountdir, ['allow_other'])
-
- t = threading.Thread(None, lambda: llfuse.main())
- t.start()
-
- # wait until the driver is finished initializing
- operations.initlock.wait()
-
- return mountdir
-
-def on_message(ev):
- global project
- global evqueue
-
- if 'event_type' in ev:
- old_attr = None
- if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
- old_attr = ev['properties']['old_attributes']
- if project not in (ev['properties']['new_attributes']['owner_uuid'],
- old_attr['owner_uuid'] if old_attr else None):
- return
-
- et = ev['event_type']
- if ev['event_type'] == 'update' and ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
- if args.project == ev['properties']['new_attributes']['owner_uuid']:
- et = 'add'
- else:
- et = 'remove'
-
- evqueue.put((project, et, ev['object_uuid']))
-
-collection = api.collections().list(filters=[["owner_uuid", "=", project]],
- limit=1,
- order='modified_at desc').execute()['items'][0]['uuid']
-
-ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], on_message)
-
-signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
-
-loop = True
-cid = None
-while loop:
- logging.info("Mounting %s" % collection)
- mountdir = run_fuse_mount(collection)
- try:
- logging.info("Starting docker container")
- cid = subprocess.check_output(["docker", "run",
- "--detach=true",
- "--publish=%i:80" % (port),
- "--volume=%s:/mnt:ro" % mountdir,
- docker_image])
- cid = cid.rstrip()
- logging.info("Container id is %s" % cid)
-
- logging.info("Waiting for events")
- running = True
- while running:
- try:
- eq = evqueue.get(True, 1)
- logging.info("%s %s" % (eq[1], eq[2]))
- newcollection = collection
- if eq[1] in ('add', 'update', 'create'):
- newcollection = eq[2]
- elif eq[1] == 'remove':
- newcollection = api.collections().list(filters=[["owner_uuid", "=", project]],
- limit=1,
- order='modified_at desc').execute()['items'][0]['uuid']
- if newcollection != collection:
- logging.info("restarting web service")
- collection = newcollection
- running = False
- except Queue.Empty:
- pass
- except (KeyboardInterrupt):
- logging.info("Got keyboard interrupt")
- ws.close()
- loop = False
- except Exception as e:
- logging.exception(str(e))
- ws.close()
- loop = False
- finally:
- if cid:
- logging.info("Stopping docker container")
- cid = subprocess.call(["docker", "stop", cid])
-
- logging.info("Unmounting")
- subprocess.call(["fusermount", "-u", "-z", mountdir])
- os.rmdir(mountdir)