2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 # arv-web enables you to run a custom web service from the contents of an Arvados collection.
8 # See http://doc.arvados.org/user/topics/arv-web.html
11 from arvados.safeapi import ThreadSafeApiCache
13 from arvados_fuse import Operations, CollectionDirectory
25 logger = logging.getLogger('arvados.arv-web')
26 logger.setLevel(logging.INFO)
29 def __init__(self, project, docker_image, port):
30 self.project = project
33 self.prev_docker_image = None
35 self.collection = None
36 self.override_docker_image = docker_image
38 self.evqueue = Queue.Queue()
39 self.api = ThreadSafeApiCache(arvados.config.settings())
41 if arvados.util.group_uuid_pattern.match(project) is None:
42 raise arvados.errors.ArgumentError("Project uuid is not valid")
44 collections = self.api.collections().list(filters=[["owner_uuid", "=", project]],
46 order='modified_at desc').execute()['items']
47 self.newcollection = collections[0]['uuid'] if collections else None
49 self.ws = arvados.events.subscribe(self.api, [["object_uuid", "is_a", "arvados#collection"]], self.on_message)
51 def check_docker_running(self):
52 # It would be less hacky to use "docker events" than poll "docker ps"
53 # but that would require writing a bigger pile of code.
55 ps = subprocess.check_output(["docker", "ps", "--no-trunc=true", "--filter=status=running"])
56 for l in ps.splitlines():
57 if l.startswith(self.cid):
61 # Handle messages from Arvados event bus.
62 def on_message(self, ev):
63 if 'event_type' in ev:
65 if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
66 old_attr = ev['properties']['old_attributes']
67 if self.project not in (ev['properties']['new_attributes']['owner_uuid'],
68 old_attr['owner_uuid'] if old_attr else None):
72 if ev['event_type'] == 'update':
73 if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
74 if self.project == ev['properties']['new_attributes']['owner_uuid']:
78 if ev['properties']['new_attributes']['trash_at'] is not None:
81 self.evqueue.put((self.project, et, ev['object_uuid']))
83 # Run an arvados_fuse mount under the control of the local process. This lets
84 # us switch out the contents of the directory without having to unmount and
86 def run_fuse_mount(self):
87 self.mountdir = tempfile.mkdtemp()
89 self.operations = Operations(os.getuid(), os.getgid(), self.api, "utf-8")
90 self.cdir = CollectionDirectory(llfuse.ROOT_INODE, self.operations.inodes, self.api, 2, self.collection)
91 self.operations.inodes.add_entry(self.cdir)
93 # Initialize the fuse connection
94 llfuse.init(self.operations, self.mountdir, ['allow_other'])
96 t = threading.Thread(None, llfuse.main)
99 # wait until the driver is finished initializing
100 self.operations.initlock.wait()
102 def mount_collection(self):
103 if self.newcollection != self.collection:
104 self.collection = self.newcollection
105 if not self.mountdir and self.collection:
106 self.run_fuse_mount()
111 # Switch the FUSE directory object so that it stores
112 # the newly selected collection
114 logger.info("Mounting %s", self.collection)
116 logger.info("Mount is empty")
117 self.cdir.change_collection(self.collection)
120 def stop_docker(self):
122 logger.info("Stopping Docker container")
123 subprocess.call(["docker", "stop", self.cid])
126 def run_docker(self):
128 if self.collection is None:
133 if self.override_docker_image:
134 docker_image = self.override_docker_image
138 if "docker_image" in self.cdir:
139 docker_image = self.cdir["docker_image"].readfrom(0, 1024).strip()
146 has_reload = "reload" in self.cdir
150 if docker_image is None:
151 logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
155 if docker_image == self.prev_docker_image and self.cid is not None and has_reload:
156 logger.info("Running container reload command")
157 subprocess.check_call(["docker", "exec", self.cid, "/mnt/reload"])
162 logger.info("Starting Docker container %s", docker_image)
163 self.cid = subprocess.check_output(["docker", "run",
165 "--publish=%i:80" % (self.port),
166 "--volume=%s:/mnt:ro" % self.mountdir,
167 docker_image]).strip()
169 self.prev_docker_image = docker_image
170 logger.info("Container id %s", self.cid)
172 except subprocess.CalledProcessError:
175 def wait_for_events(self):
177 logger.warning("No service running! Will wait for a new collection to appear in the project.")
179 logger.info("Waiting for events")
184 # Main run loop. Wait on project events, signals, or the
185 # Docker container stopping.
188 # Poll the queue with a 1 second timeout, if we have no
189 # timeout the Python runtime doesn't have a chance to
190 # process SIGINT or SIGTERM.
191 eq = self.evqueue.get(True, 1)
192 logger.info("%s %s", eq[1], eq[2])
193 self.newcollection = self.collection
194 if eq[1] in ('add', 'update', 'create'):
195 self.newcollection = eq[2]
196 elif eq[1] == 'remove':
197 collections = self.api.collections().list(filters=[["owner_uuid", "=", self.project]],
199 order='modified_at desc').execute()['items']
200 self.newcollection = collections[0]['uuid'] if collections else None
205 if self.cid and not self.check_docker_running():
206 logger.warning("Service has terminated. Will try to restart.")
215 self.mount_collection()
218 self.wait_for_events()
219 except (KeyboardInterrupt):
220 logger.info("Got keyboard interrupt")
223 except Exception as e:
224 logger.exception("Caught fatal exception, shutting down")
231 logger.info("Unmounting")
232 subprocess.call(["fusermount", "-u", self.mountdir])
233 os.rmdir(self.mountdir)
237 parser = argparse.ArgumentParser()
238 parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch")
239 parser.add_argument('--port', type=int, default=8080, help="Host port to listen on (default 8080)")
240 parser.add_argument('--image', type=str, help="Docker image to run")
242 args = parser.parse_args(argv)
244 signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
247 arvweb = ArvWeb(args.project_uuid, args.image, args.port)
249 except arvados.errors.ArgumentError as e:
255 if __name__ == '__main__':
256 sys.exit(main(sys.argv[1:]))