3 # arv-web enables you to run a custom web service from the contents of an Arvados collection.
5 # See http://doc.arvados.org/user/topics/arv-web.html
9 from arvados_fuse import Operations, SafeApi, CollectionDirectory
21 logger = logging.getLogger('arvados.arv-web')
22 logger.setLevel(logging.INFO)
25 def __init__(self, project, docker_image, port):
26 self.project = project
29 self.prev_docker_image = None
31 self.collection = None
32 self.override_docker_image = docker_image
34 self.evqueue = Queue.Queue()
35 self.api = SafeApi(arvados.config)
37 if arvados.util.group_uuid_pattern.match(project) is None:
38 raise arvados.errors.ArgumentError("Project uuid is not valid")
40 collections = self.api.collections().list(filters=[["owner_uuid", "=", project]],
42 order='modified_at desc').execute()['items']
43 self.newcollection = collections[0]['uuid'] if collections else None
45 self.ws = arvados.events.subscribe(self.api, [["object_uuid", "is_a", "arvados#collection"]], self.on_message)
47 def check_docker_running(self):
48 # It would be less hacky to use "docker events" than poll "docker ps"
49 # but that would require writing a bigger pile of code.
51 ps = subprocess.check_output(["docker", "ps", "--no-trunc=true", "--filter=status=running"])
52 for l in ps.splitlines():
53 if l.startswith(self.cid):
57 # Handle messages from Arvados event bus.
58 def on_message(self, ev):
59 if 'event_type' in ev:
61 if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
62 old_attr = ev['properties']['old_attributes']
63 if self.project not in (ev['properties']['new_attributes']['owner_uuid'],
64 old_attr['owner_uuid'] if old_attr else None):
68 if ev['event_type'] == 'update':
69 if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
70 if self.project == ev['properties']['new_attributes']['owner_uuid']:
74 if ev['properties']['new_attributes']['expires_at'] is not None:
77 self.evqueue.put((self.project, et, ev['object_uuid']))
79 # Run an arvados_fuse mount under the control of the local process. This lets
80 # us switch out the contents of the directory without having to unmount and
82 def run_fuse_mount(self):
83 self.mountdir = tempfile.mkdtemp()
85 self.operations = Operations(os.getuid(), os.getgid(), "utf-8")
86 self.cdir = CollectionDirectory(llfuse.ROOT_INODE, self.operations.inodes, self.api, 2, self.collection)
87 self.operations.inodes.add_entry(self.cdir)
89 # Initialize the fuse connection
90 llfuse.init(self.operations, self.mountdir, ['allow_other'])
92 t = threading.Thread(None, llfuse.main)
95 # wait until the driver is finished initializing
96 self.operations.initlock.wait()
98 def mount_collection(self):
99 if self.newcollection != self.collection:
100 self.collection = self.newcollection
101 if not self.mountdir and self.collection:
102 self.run_fuse_mount()
107 # Switch the FUSE directory object so that it stores
108 # the newly selected collection
110 logger.info("Mounting %s", self.collection)
112 logger.info("Mount is empty")
113 self.cdir.change_collection(self.collection)
116 def stop_docker(self):
118 logger.info("Stopping Docker container")
119 subprocess.call(["docker", "stop", self.cid])
122 def run_docker(self):
124 if self.collection is None:
129 if self.override_docker_image:
130 docker_image = self.override_docker_image
134 if "docker_image" in self.cdir:
135 docker_image = self.cdir["docker_image"].readfrom(0, 1024).strip()
142 has_reload = "reload" in self.cdir
146 if docker_image is None:
147 logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
151 if docker_image == self.prev_docker_image and self.cid is not None and has_reload:
152 logger.info("Running container reload command")
153 subprocess.check_call(["docker", "exec", self.cid, "/mnt/reload"])
158 logger.info("Starting Docker container %s", docker_image)
159 self.cid = subprocess.check_output(["docker", "run",
161 "--publish=%i:80" % (self.port),
162 "--volume=%s:/mnt:ro" % self.mountdir,
163 docker_image]).strip()
165 self.prev_docker_image = docker_image
166 logger.info("Container id %s", self.cid)
168 except subprocess.CalledProcessError:
171 def wait_for_events(self):
173 logger.warning("No service running! Will wait for a new collection to appear in the project.")
175 logger.info("Waiting for events")
180 # Main run loop. Wait on project events, signals, or the
181 # Docker container stopping.
184 # Poll the queue with a 1 second timeout, if we have no
185 # timeout the Python runtime doesn't have a chance to
186 # process SIGINT or SIGTERM.
187 eq = self.evqueue.get(True, 1)
188 logger.info("%s %s", eq[1], eq[2])
189 self.newcollection = self.collection
190 if eq[1] in ('add', 'update', 'create'):
191 self.newcollection = eq[2]
192 elif eq[1] == 'remove':
193 collections = self.api.collections().list(filters=[["owner_uuid", "=", self.project]],
195 order='modified_at desc').execute()['items']
196 self.newcollection = collections[0]['uuid'] if collections else None
201 if self.cid and not self.check_docker_running():
202 logger.warning("Service has terminated. Will try to restart.")
211 self.mount_collection()
214 self.wait_for_events()
215 except (KeyboardInterrupt):
216 logger.info("Got keyboard interrupt")
219 except Exception as e:
220 logger.exception("Caught fatal exception, shutting down")
227 logger.info("Unmounting")
228 subprocess.call(["fusermount", "-u", self.mountdir])
229 os.rmdir(self.mountdir)
233 parser = argparse.ArgumentParser()
234 parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch")
235 parser.add_argument('--port', type=int, default=8080, help="Host port to listen on (default 8080)")
236 parser.add_argument('--image', type=str, help="Docker image to run")
238 args = parser.parse_args(argv)
240 signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
243 arvweb = ArvWeb(args.project_uuid, args.image, args.port)
245 except arvados.errors.ArgumentError as e:
251 if __name__ == '__main__':
252 sys.exit(main(sys.argv[1:]))