Merge branch 'master' into 4904-arv-web
[arvados.git] / services / arv-web / arv-web.py
1 #!/usr/bin/env python
2
3 # arv-web enables you to run a custom web service from the contents of an Arvados collection.
4 #
5 # See http://doc.arvados.org/user/topics/arv-web.html
6
7 import arvados
8 import subprocess
9 from arvados_fuse import Operations, SafeApi, CollectionDirectory
10 import tempfile
11 import os
12 import llfuse
13 import threading
14 import Queue
15 import argparse
16 import logging
17 import signal
18 import sys
19 import functools
20
21 logger = logging.getLogger('arvados.arv-web')
22 logger.setLevel(logging.INFO)
23
24 class ArvWeb(object):
25     def __init__(self, project, docker_image, port):
26         self.project = project
27         self.loop = True
28         self.cid = None
29         self.prev_docker_image = None
30         self.mountdir = None
31         self.collection = None
32         self.override_docker_image = docker_image
33         self.port = port
34         self.evqueue = Queue.Queue()
35         self.api = SafeApi(arvados.config)
36
37         if arvados.util.group_uuid_pattern.match(project) is None:
38             raise arvados.errors.ArgumentError("Project uuid is not valid")
39
40         collections = self.api.collections().list(filters=[["owner_uuid", "=", project]],
41                         limit=1,
42                         order='modified_at desc').execute()['items']
43         self.newcollection = collections[0]['uuid'] if collections else None
44
45         self.ws = arvados.events.subscribe(self.api, [["object_uuid", "is_a", "arvados#collection"]], self.on_message)
46
47     def check_docker_running(self):
48         # It would be less hacky to use "docker events" than poll "docker ps"
49         # but that would require writing a bigger pile of code.
50         if self.cid:
51             ps = subprocess.check_output(["docker", "ps", "--no-trunc=true", "--filter=status=running"])
52             for l in ps.splitlines():
53                 if l.startswith(self.cid):
54                     return True
55         return False
56
57     # Handle messages from Arvados event bus.
58     def on_message(self, ev):
59         if 'event_type' in ev:
60             old_attr = None
61             if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
62                 old_attr = ev['properties']['old_attributes']
63             if self.project not in (ev['properties']['new_attributes']['owner_uuid'],
64                                     old_attr['owner_uuid'] if old_attr else None):
65                 return
66
67             et = ev['event_type']
68             if ev['event_type'] == 'update':
69                 if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
70                     if self.project == ev['properties']['new_attributes']['owner_uuid']:
71                         et = 'add'
72                     else:
73                         et = 'remove'
74                 if ev['properties']['new_attributes']['expires_at'] is not None:
75                     et = 'remove'
76
77             self.evqueue.put((self.project, et, ev['object_uuid']))
78
79     # Run an arvados_fuse mount under the control of the local process.  This lets
80     # us switch out the contents of the directory without having to unmount and
81     # remount.
82     def run_fuse_mount(self):
83         self.mountdir = tempfile.mkdtemp()
84
85         self.operations = Operations(os.getuid(), os.getgid(), "utf-8")
86         self.cdir = CollectionDirectory(llfuse.ROOT_INODE, self.operations.inodes, self.api, 2, self.collection)
87         self.operations.inodes.add_entry(self.cdir)
88
89         # Initialize the fuse connection
90         llfuse.init(self.operations, self.mountdir, ['allow_other'])
91
92         t = threading.Thread(None, llfuse.main)
93         t.start()
94
95         # wait until the driver is finished initializing
96         self.operations.initlock.wait()
97
98     def mount_collection(self):
99         if self.newcollection != self.collection:
100             self.collection = self.newcollection
101             if not self.mountdir and self.collection:
102                 self.run_fuse_mount()
103
104             if self.mountdir:
105                 with llfuse.lock:
106                     self.cdir.clear()
107                     # Switch the FUSE directory object so that it stores
108                     # the newly selected collection
109                     if self.collection:
110                         logger.info("Mounting %s", self.collection)
111                     else:
112                         logger.info("Mount is empty")
113                     self.cdir.change_collection(self.collection)
114
115
116     def stop_docker(self):
117         if self.cid:
118             logger.info("Stopping Docker container")
119             subprocess.call(["docker", "stop", self.cid])
120             self.cid = None
121
122     def run_docker(self):
123         try:
124             if self.collection is None:
125                 self.stop_docker()
126                 return
127
128             docker_image = None
129             if self.override_docker_image:
130                 docker_image = self.override_docker_image
131             else:
132                 try:
133                     with llfuse.lock:
134                         if "docker_image" in self.cdir:
135                             docker_image = self.cdir["docker_image"].readfrom(0, 1024).strip()
136                 except IOError as e:
137                     pass
138
139             has_reload = False
140             try:
141                 with llfuse.lock:
142                     has_reload = "reload" in self.cdir
143             except IOError as e:
144                 pass
145
146             if docker_image is None:
147                 logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
148                 self.stop_docker()
149                 return
150
151             if docker_image == self.prev_docker_image and self.cid is not None and has_reload:
152                 logger.info("Running container reload command")
153                 subprocess.check_call(["docker", "exec", self.cid, "/mnt/reload"])
154                 return
155
156             self.stop_docker()
157
158             logger.info("Starting Docker container %s", docker_image)
159             ciddir = tempfile.mkdtemp()
160             cidfilepath = os.path.join(ciddir, "cidfile")
161             self.cid = subprocess.check_output(["docker", "run",
162                                                 "--detach=true",
163                                                 "--publish=%i:80" % (self.port),
164                                                 "--volume=%s:/mnt:ro" % self.mountdir,
165                                                 docker_image]).strip()
166
167             self.prev_docker_image = docker_image
168             logger.info("Container id %s", self.cid)
169
170         except subprocess.CalledProcessError:
171             self.cid = None
172
173     def wait_for_events(self):
174         if not self.cid:
175             logger.warning("No service running!  Will wait for a new collection to appear in the project.")
176         else:
177             logger.info("Waiting for events")
178
179         running = True
180         self.loop = True
181         while running:
182             # Main run loop.  Wait on project events, signals, or the
183             # Docker container stopping.
184
185             try:
186                 # Poll the queue with a 1 second timeout, if we have no
187                 # timeout the Python runtime doesn't have a chance to
188                 # process SIGINT or SIGTERM.
189                 eq = self.evqueue.get(True, 1)
190                 logger.info("%s %s", eq[1], eq[2])
191                 self.newcollection = self.collection
192                 if eq[1] in ('add', 'update', 'create'):
193                     self.newcollection = eq[2]
194                 elif eq[1] == 'remove':
195                     collections = self.api.collections().list(filters=[["owner_uuid", "=", self.project]],
196                                                         limit=1,
197                                                         order='modified_at desc').execute()['items']
198                     self.newcollection = collections[0]['uuid'] if collections else None
199                 running = False
200             except Queue.Empty:
201                 pass
202
203             if self.cid and not self.check_docker_running():
204                 logger.warning("Service has terminated.  Will try to restart.")
205                 self.cid = None
206                 running = False
207
208
209     def run(self):
210         try:
211             while self.loop:
212                 self.loop = False
213                 self.mount_collection()
214                 try:
215                     self.run_docker()
216                     self.wait_for_events()
217                 except (KeyboardInterrupt):
218                     logger.info("Got keyboard interrupt")
219                     self.ws.close()
220                     self.loop = False
221                 except Exception as e:
222                     logger.exception("Caught fatal exception, shutting down")
223                     self.ws.close()
224                     self.loop = False
225         finally:
226             self.stop_docker()
227
228             if self.mountdir:
229                 logger.info("Unmounting")
230                 subprocess.call(["fusermount", "-u", self.mountdir])
231                 os.rmdir(self.mountdir)
232
233
234 def main(argv):
235     parser = argparse.ArgumentParser()
236     parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch")
237     parser.add_argument('--port', type=int, default=8080, help="Host port to listen on (default 8080)")
238     parser.add_argument('--image', type=str, help="Docker image to run")
239
240     args = parser.parse_args(argv)
241
242     signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
243
244     try:
245         arvweb = ArvWeb(args.project_uuid, args.image, args.port)
246         arvweb.run()
247     except arvados.errors.ArgumentError as e:
248         logger.error(e)
249
250 if __name__ == '__main__':
251     main(sys.argv[1:])