3 # arv-web enables you to run a custom web service from the contents of an Arvados collection.
5 # See http://doc.arvados.org/user/topics/arv-web.html
8 from arvados.safeapi import ThreadSafeApiCache
10 from arvados_fuse import Operations, CollectionDirectory
22 logger = logging.getLogger('arvados.arv-web')
23 logger.setLevel(logging.INFO)
26 def __init__(self, project, docker_image, port):
27 self.project = project
30 self.prev_docker_image = None
32 self.collection = None
33 self.override_docker_image = docker_image
35 self.evqueue = Queue.Queue()
36 self.api = ThreadSafeApiCache(arvados.config.settings())
38 if arvados.util.group_uuid_pattern.match(project) is None:
39 raise arvados.errors.ArgumentError("Project uuid is not valid")
41 collections = self.api.collections().list(filters=[["owner_uuid", "=", project]],
43 order='modified_at desc').execute()['items']
44 self.newcollection = collections[0]['uuid'] if collections else None
46 self.ws = arvados.events.subscribe(self.api, [["object_uuid", "is_a", "arvados#collection"]], self.on_message)
48 def check_docker_running(self):
49 # It would be less hacky to use "docker events" than poll "docker ps"
50 # but that would require writing a bigger pile of code.
52 ps = subprocess.check_output(["docker", "ps", "--no-trunc=true", "--filter=status=running"])
53 for l in ps.splitlines():
54 if l.startswith(self.cid):
58 # Handle messages from Arvados event bus.
59 def on_message(self, ev):
60 if 'event_type' in ev:
62 if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
63 old_attr = ev['properties']['old_attributes']
64 if self.project not in (ev['properties']['new_attributes']['owner_uuid'],
65 old_attr['owner_uuid'] if old_attr else None):
69 if ev['event_type'] == 'update':
70 if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
71 if self.project == ev['properties']['new_attributes']['owner_uuid']:
75 if ev['properties']['new_attributes']['expires_at'] is not None:
78 self.evqueue.put((self.project, et, ev['object_uuid']))
80 # Run an arvados_fuse mount under the control of the local process. This lets
81 # us switch out the contents of the directory without having to unmount and
83 def run_fuse_mount(self):
84 self.mountdir = tempfile.mkdtemp()
86 self.operations = Operations(os.getuid(), os.getgid(), "utf-8")
87 self.cdir = CollectionDirectory(llfuse.ROOT_INODE, self.operations.inodes, self.api, 2, self.collection)
88 self.operations.inodes.add_entry(self.cdir)
90 # Initialize the fuse connection
91 llfuse.init(self.operations, self.mountdir, ['allow_other'])
93 t = threading.Thread(None, llfuse.main)
96 # wait until the driver is finished initializing
97 self.operations.initlock.wait()
99 def mount_collection(self):
100 if self.newcollection != self.collection:
101 self.collection = self.newcollection
102 if not self.mountdir and self.collection:
103 self.run_fuse_mount()
108 # Switch the FUSE directory object so that it stores
109 # the newly selected collection
111 logger.info("Mounting %s", self.collection)
113 logger.info("Mount is empty")
114 self.cdir.change_collection(self.collection)
117 def stop_docker(self):
119 logger.info("Stopping Docker container")
120 subprocess.call(["docker", "stop", self.cid])
123 def run_docker(self):
125 if self.collection is None:
130 if self.override_docker_image:
131 docker_image = self.override_docker_image
135 if "docker_image" in self.cdir:
136 docker_image = self.cdir["docker_image"].readfrom(0, 1024).strip()
143 has_reload = "reload" in self.cdir
147 if docker_image is None:
148 logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
152 if docker_image == self.prev_docker_image and self.cid is not None and has_reload:
153 logger.info("Running container reload command")
154 subprocess.check_call(["docker", "exec", self.cid, "/mnt/reload"])
159 logger.info("Starting Docker container %s", docker_image)
160 self.cid = subprocess.check_output(["docker", "run",
162 "--publish=%i:80" % (self.port),
163 "--volume=%s:/mnt:ro" % self.mountdir,
164 docker_image]).strip()
166 self.prev_docker_image = docker_image
167 logger.info("Container id %s", self.cid)
169 except subprocess.CalledProcessError:
172 def wait_for_events(self):
174 logger.warning("No service running! Will wait for a new collection to appear in the project.")
176 logger.info("Waiting for events")
181 # Main run loop. Wait on project events, signals, or the
182 # Docker container stopping.
185 # Poll the queue with a 1 second timeout, if we have no
186 # timeout the Python runtime doesn't have a chance to
187 # process SIGINT or SIGTERM.
188 eq = self.evqueue.get(True, 1)
189 logger.info("%s %s", eq[1], eq[2])
190 self.newcollection = self.collection
191 if eq[1] in ('add', 'update', 'create'):
192 self.newcollection = eq[2]
193 elif eq[1] == 'remove':
194 collections = self.api.collections().list(filters=[["owner_uuid", "=", self.project]],
196 order='modified_at desc').execute()['items']
197 self.newcollection = collections[0]['uuid'] if collections else None
202 if self.cid and not self.check_docker_running():
203 logger.warning("Service has terminated. Will try to restart.")
212 self.mount_collection()
215 self.wait_for_events()
216 except (KeyboardInterrupt):
217 logger.info("Got keyboard interrupt")
220 except Exception as e:
221 logger.exception("Caught fatal exception, shutting down")
228 logger.info("Unmounting")
229 subprocess.call(["fusermount", "-u", self.mountdir])
230 os.rmdir(self.mountdir)
234 parser = argparse.ArgumentParser()
235 parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch")
236 parser.add_argument('--port', type=int, default=8080, help="Host port to listen on (default 8080)")
237 parser.add_argument('--image', type=str, help="Docker image to run")
239 args = parser.parse_args(argv)
241 signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
244 arvweb = ArvWeb(args.project_uuid, args.image, args.port)
246 except arvados.errors.ArgumentError as e:
252 if __name__ == '__main__':
253 sys.exit(main(sys.argv[1:]))