X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8233babd1d979b545e0b8f15455787af66307d9a..da2492bfc43032c3374b6509a7208127ec48093a:/services/arv-web/arv-web.py diff --git a/services/arv-web/arv-web.py b/services/arv-web/arv-web.py index d3f7b84b7a..1fd61fd327 100755 --- a/services/arv-web/arv-web.py +++ b/services/arv-web/arv-web.py @@ -18,231 +18,241 @@ import signal import sys import functools -# Run an arvados_fuse mount under the control of the local process. This lets -# us switch out the contents of the directory without having to unmount and -# remount. -def run_fuse_mount(api, collection): - mountdir = tempfile.mkdtemp() - - operations = Operations(os.getuid(), os.getgid(), "utf-8") - cdir = CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, 2, collection) - operations.inodes.add_entry(cdir) - - # Initialize the fuse connection - llfuse.init(operations, mountdir, ['allow_other']) - - t = threading.Thread(None, llfuse.main) - t.start() - - # wait until the driver is finished initializing - operations.initlock.wait() - - return (mountdir, cdir) - -# Handle messages from Arvados event bus. -def on_message(project, evqueue, ev): - if 'event_type' in ev: - old_attr = None - if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']: - old_attr = ev['properties']['old_attributes'] - if project not in (ev['properties']['new_attributes']['owner_uuid'], - old_attr['owner_uuid'] if old_attr else None): - return - - et = ev['event_type'] - if ev['event_type'] == 'update': - if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']: - if args.project_uuid == ev['properties']['new_attributes']['owner_uuid']: - et = 'add' - else: +logger = logging.getLogger('arvados.arv-web') +logger.setLevel(logging.INFO) + +class ArvWeb(object): + def __init__(self, project, docker_image, port): + self.project = project + self.loop = True + self.cid = None + self.docker_proc = None + self.prev_docker_image = None + self.mountdir = None + self.collection = None + self.override_docker_image = docker_image + self.port = port + self.evqueue = Queue.Queue() + self.api = SafeApi(arvados.config) + + if arvados.util.group_uuid_patternmatch(project) is None: + raise arvados.errors.ArgumentError("Project uuid is not valid") + + collections = api.collections().list(filters=[["owner_uuid", "=", project]], + limit=1, + order='modified_at desc').execute()['items'] + self.newcollection = collections[0]['uuid'] if len(collections) > 0 else None + + self.ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], self.on_message) + + # Handle messages from Arvados event bus. + def on_message(self, ev): + if 'event_type' in ev: + old_attr = None + if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']: + old_attr = ev['properties']['old_attributes'] + if self.project not in (ev['properties']['new_attributes']['owner_uuid'], + old_attr['owner_uuid'] if old_attr else None): + return + + et = ev['event_type'] + if ev['event_type'] == 'update': + if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']: + if self.project == ev['properties']['new_attributes']['owner_uuid']: + et = 'add' + else: + et = 'remove' + if ev['properties']['new_attributes']['expires_at'] is not None: et = 'remove' - if ev['properties']['new_attributes']['expires_at'] is not None: - et = 'remove' - evqueue.put((project, et, ev['object_uuid'])) + self.evqueue.put((self.project, et, ev['object_uuid'])) -def main(argv): - logger = logging.getLogger('arvados.arv-web') - logger.setLevel(logging.INFO) + # Run an arvados_fuse mount under the control of the local process. This lets + # us switch out the contents of the directory without having to unmount and + # remount. + def run_fuse_mount(self): + self.mountdir = tempfile.mkdtemp() - parser = argparse.ArgumentParser() - parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch") - parser.add_argument('--port', type=int, default=8080, help="Host port to listen on (default 8080)") - parser.add_argument('--image', type=str, help="Docker image to run") + self.operations = Operations(os.getuid(), os.getgid(), "utf-8") + self.cdir = CollectionDirectory(llfuse.ROOT_INODE, self.operations.inodes, api, 2, self.collection) + self.operations.inodes.add_entry(cdir) - args = parser.parse_args(argv) + # Initialize the fuse connection + llfuse.init(operations, mountdir, ['allow_other']) - api = SafeApi(arvados.config) - project = args.project_uuid - docker_image = args.image - port = args.port - evqueue = Queue.Queue() + t = threading.Thread(None, llfuse.main) + t.start() - collections = api.collections().list(filters=[["owner_uuid", "=", project]], - limit=1, - order='modified_at desc').execute()['items'] - newcollection = collections[0]['uuid'] if len(collections) > 0 else None - collection = None + # wait until the driver is finished initializing + self.operations.initlock.wait() - ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], functools.partial(on_message, project, evqueue)) - - signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0)) - - loop = True - cid = None - docker_proc = None - prev_docker_image = None - mountdir = None - - try: - while loop: - loop = False - if newcollection != collection: - collection = newcollection - if not mountdir: - (mountdir, cdir) = run_fuse_mount(api, collection) + def mount_collection(self): + if self.newcollection != self.collection: + self.collection = self.newcollection + if not self.mountdir and self.collection: + self.run_fuse_mount() + if self.mountdir: with llfuse.lock: - cdir.clear() - if collection: + self.cdir.clear() + if self.collection: # Switch the FUSE directory object so that it stores # the newly selected collection - logger.info("Mounting %s", collection) - cdir.collection_locator = collection - cdir.collection_object = None - cdir.update() + logger.info("Mounting %s", self.collection) + cdir.change_collection(self.collection) + def stop_docker(self): + if self.cid: + logger.info("Stopping Docker container") + subprocess.check_call(["docker", "stop", cid]) + self.cid = None + self.docker_proc = None + + def run_docker(self): + try: + if self.collection is None: + self.stop_docker() + return + + docker_image = None + if self.override_docker_image: + docker_image = self.override_docker_image + else: + try: + with llfuse.lock: + if "docker_image" in self.cdir: + docker_image = self.cdir["docker_image"].readfrom(0, 1024).strip() + except IOError as e: + pass + + has_reload = False try: + with llfuse.lock: + has_reload = "reload" in self.cdir + except IOError as e: + pass + + if docker_image is None: + logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.") + self.stop_docker() + return + + if docker_image == self.prev_docker_image and self.cid is not None and has_reload: + logger.info("Running container reload command") + subprocess.check_call(["docker", "exec", cid, "/mnt/reload"]) + return + + self.stop_docker() + + logger.info("Starting Docker container %s", docker_image) + ciddir = tempfile.mkdtemp() + cidfilepath = os.path.join(ciddir, "cidfile") + self.docker_proc = subprocess.Popen(["docker", "run", + "--cidfile=%s" % (cidfilepath), + "--publish=%i:80" % (self.port), + "--volume=%s:/mnt:ro" % self.mountdir, + docker_image]) + self.cid = None + while self.cid is None and self.docker_proc.poll() is None: try: - if collection: - if not args.image: - docker_image = None - - # FUSE is asynchronous, so there is a race between - # the directory being updated above and the kernel - # cache being refreshed. This manifests as the - # bizare behavior where os.path.exists() returns - # True, but open() raises "file not found". The - # workaround is to keep trying until the kernel - # catches up. - while not docker_image and os.path.exists(os.path.join(mountdir, "docker_image")): - try: - with open(os.path.join(mountdir, "docker_image")) as di: - docker_image = di.read().strip() - except IOError as e: - pass - - if not docker_image: - logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.") - - if docker_image and ((docker_image != prev_docker_image) or cid is None): - if cid: - logger.info("Stopping Docker container") - subprocess.check_call(["docker", "stop", cid]) - cid = None - docker_proc = None - - if docker_image: - logger.info("Starting Docker container %s", docker_image) - ciddir = tempfile.mkdtemp() - cidfilepath = os.path.join(ciddir, "cidfile") - docker_proc = subprocess.Popen(["docker", "run", - "--cidfile=%s" % (cidfilepath), - "--publish=%i:80" % (port), - "--volume=%s:/mnt:ro" % mountdir, - docker_image]) - cid = None - while not cid and docker_proc.poll() is None: - try: - with open(cidfilepath) as cidfile: - cid = cidfile.read().strip() - except IOError: - pass - try: - os.unlink(cidfilepath) - os.rmdir(ciddir) - except OSError: - pass - - prev_docker_image = docker_image - logger.info("Container id %s", cid) - elif cid: - logger.info("Sending refresh signal to container") - # Send SIGHUP to all the processes inside the - # container. By convention, services are expected - # to reload their configuration. If they die - # instead, that's okay, because then we'll just - # start a new container. - # - # Getting the services inside the container to - # refresh turned out to be really hard. Here are - # some of the other things I tried: - # - # docker kill --signal=HUP # no effect - # docker_proc.send_signal(signal.SIGHUP) # no effect - # os.killpg(os.getpgid(docker_proc.pid), signal.SIGHUP) # docker-proxy dies as collatoral damage - # docker exec apache2ctl restart # only works if service is using apache. - # Sending HUP directly to the processes inside the container: permission denied - - subprocess.check_call(["docker", "exec", cid, "killall", "--regexp", ".*", "--signal", "HUP"]) - elif cid: - logger.info("Stopping docker container") - subprocess.check_call(["docker", "stop", cid]) - except subprocess.CalledProcessError: - cid = None - - if not cid: - logger.warning("No service running! Will wait for a new collection to appear in the project.") - else: - logger.info("Waiting for events") - - running = True - loop = True - while running: - # Main run loop. Wait on project events, signals, or the - # Docker container stopping. - - try: - # Poll the queue with a 1 second timeout, if we have no - # timeout the Python runtime doesn't have a chance to - # process SIGINT or SIGTERM. - eq = evqueue.get(True, 1) - logger.info("%s %s", eq[1], eq[2]) - newcollection = collection - if eq[1] in ('add', 'update', 'create'): - newcollection = eq[2] - elif eq[1] == 'remove': - collections = api.collections().list(filters=[["owner_uuid", "=", project]], - limit=1, - order='modified_at desc').execute()['items'] - newcollection = collections[0]['uuid'] if len(collections) > 0 else None - running = False - except Queue.Empty: - pass - - if docker_proc and docker_proc.poll() is not None: - logger.warning("Service has terminated. Will try to restart.") - cid = None - docker_proc = None - running = False - - except (KeyboardInterrupt): - logger.info("Got keyboard interrupt") - ws.close() - loop = False - except Exception as e: - logger.exception(e) - ws.close() - loop = False - finally: - if cid: - logger.info("Stopping docker container") - subprocess.check_call(["docker", "stop", cid]) + with open(cidfilepath) as cidfile: + self.cid = cidfile.read().strip() + except IOError as e: + # XXX check for ENOENT + pass + + try: + if os.path.exists(cidfilepath): + os.unlink(cidfilepath) + os.rmdir(ciddir) + except OSError: + pass + + self.prev_docker_image = docker_image + logger.info("Container id %s", self.cid) + + except subprocess.CalledProcessError: + self.cid = None + + def wait_for_events(self): + if not self.cid: + logger.warning("No service running! Will wait for a new collection to appear in the project.") + else: + logger.info("Waiting for events") + + running = True + self.loop = True + while running: + # Main run loop. Wait on project events, signals, or the + # Docker container stopping. + + try: + # Poll the queue with a 1 second timeout, if we have no + # timeout the Python runtime doesn't have a chance to + # process SIGINT or SIGTERM. + eq = self.evqueue.get(True, 1) + logger.info("%s %s", eq[1], eq[2]) + self.newcollection = self.collection + if eq[1] in ('add', 'update', 'create'): + self.newcollection = eq[2] + elif eq[1] == 'remove': + collections = api.collections().list(filters=[["owner_uuid", "=", project]], + limit=1, + order='modified_at desc').execute()['items'] + self.newcollection = collections[0]['uuid'] if len(collections) > 0 else None + running = False + except Queue.Empty: + pass + + if self.docker_proc and self.docker_proc.poll() is not None: + logger.warning("Service has terminated. Will try to restart.") + self.cid = None + self.docker_proc = None + running = False + + + def run(self): + try: + while self.loop: + self.loop = False + self.mount_collection() + try: + self.run_docker() + self.wait_for_events() + except (KeyboardInterrupt): + logger.info("Got keyboard interrupt") + self.ws.close() + self.loop = False + except Exception as e: + logger.exception("Caught fatal exception, shutting down") + self.ws.close() + self.loop = False + finally: + if self.cid: + logger.info("Stopping docker container") + subprocess.call(["docker", "stop", self.cid]) + + if self.mountdir: + logger.info("Unmounting") + subprocess.call(["fusermount", "-u", self.mountdir]) + os.rmdir(self.mountdir) + + +def main(argv): + parser = argparse.ArgumentParser() + parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch") + parser.add_argument('--port', type=int, default=8080, help="Host port to listen on (default 8080)") + parser.add_argument('--image', type=str, help="Docker image to run") - if mountdir: - logger.info("Unmounting") - subprocess.call(["fusermount", "-u", "-z", mountdir]) - os.rmdir(mountdir) + args = parser.parse_args(argv) + + signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0)) + + try: + arvweb = ArvWeb(args.project_uuid, args.image, args.ports) + arvweb.run() + except arvados.errors.ArgumentError as e: + logger.error(e) if __name__ == '__main__': main(sys.argv[1:])