Merge branch '8800-queue-query'
[arvados.git] / services / arv-web / arv-web.py
1 #!/usr/bin/env python
2
3 # arv-web enables you to run a custom web service from the contents of an Arvados collection.
4 #
5 # See http://doc.arvados.org/user/topics/arv-web.html
6
7 import arvados
8 from arvados.safeapi import ThreadSafeApiCache
9 import subprocess
10 from arvados_fuse import Operations, CollectionDirectory
11 import tempfile
12 import os
13 import llfuse
14 import threading
15 import Queue
16 import argparse
17 import logging
18 import signal
19 import sys
20 import functools
21
22 logger = logging.getLogger('arvados.arv-web')
23 logger.setLevel(logging.INFO)
24
25 class ArvWeb(object):
26     def __init__(self, project, docker_image, port):
27         self.project = project
28         self.loop = True
29         self.cid = None
30         self.prev_docker_image = None
31         self.mountdir = None
32         self.collection = None
33         self.override_docker_image = docker_image
34         self.port = port
35         self.evqueue = Queue.Queue()
36         self.api = ThreadSafeApiCache(arvados.config.settings())
37
38         if arvados.util.group_uuid_pattern.match(project) is None:
39             raise arvados.errors.ArgumentError("Project uuid is not valid")
40
41         collections = self.api.collections().list(filters=[["owner_uuid", "=", project]],
42                         limit=1,
43                         order='modified_at desc').execute()['items']
44         self.newcollection = collections[0]['uuid'] if collections else None
45
46         self.ws = arvados.events.subscribe(self.api, [["object_uuid", "is_a", "arvados#collection"]], self.on_message)
47
48     def check_docker_running(self):
49         # It would be less hacky to use "docker events" than poll "docker ps"
50         # but that would require writing a bigger pile of code.
51         if self.cid:
52             ps = subprocess.check_output(["docker", "ps", "--no-trunc=true", "--filter=status=running"])
53             for l in ps.splitlines():
54                 if l.startswith(self.cid):
55                     return True
56         return False
57
58     # Handle messages from Arvados event bus.
59     def on_message(self, ev):
60         if 'event_type' in ev:
61             old_attr = None
62             if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
63                 old_attr = ev['properties']['old_attributes']
64             if self.project not in (ev['properties']['new_attributes']['owner_uuid'],
65                                     old_attr['owner_uuid'] if old_attr else None):
66                 return
67
68             et = ev['event_type']
69             if ev['event_type'] == 'update':
70                 if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
71                     if self.project == ev['properties']['new_attributes']['owner_uuid']:
72                         et = 'add'
73                     else:
74                         et = 'remove'
75                 if ev['properties']['new_attributes']['expires_at'] is not None:
76                     et = 'remove'
77
78             self.evqueue.put((self.project, et, ev['object_uuid']))
79
80     # Run an arvados_fuse mount under the control of the local process.  This lets
81     # us switch out the contents of the directory without having to unmount and
82     # remount.
83     def run_fuse_mount(self):
84         self.mountdir = tempfile.mkdtemp()
85
86         self.operations = Operations(os.getuid(), os.getgid(), self.api, "utf-8")
87         self.cdir = CollectionDirectory(llfuse.ROOT_INODE, self.operations.inodes, self.api, 2, self.collection)
88         self.operations.inodes.add_entry(self.cdir)
89
90         # Initialize the fuse connection
91         llfuse.init(self.operations, self.mountdir, ['allow_other'])
92
93         t = threading.Thread(None, llfuse.main)
94         t.start()
95
96         # wait until the driver is finished initializing
97         self.operations.initlock.wait()
98
99     def mount_collection(self):
100         if self.newcollection != self.collection:
101             self.collection = self.newcollection
102             if not self.mountdir and self.collection:
103                 self.run_fuse_mount()
104
105             if self.mountdir:
106                 with llfuse.lock:
107                     self.cdir.clear()
108                     # Switch the FUSE directory object so that it stores
109                     # the newly selected collection
110                     if self.collection:
111                         logger.info("Mounting %s", self.collection)
112                     else:
113                         logger.info("Mount is empty")
114                     self.cdir.change_collection(self.collection)
115
116
117     def stop_docker(self):
118         if self.cid:
119             logger.info("Stopping Docker container")
120             subprocess.call(["docker", "stop", self.cid])
121             self.cid = None
122
123     def run_docker(self):
124         try:
125             if self.collection is None:
126                 self.stop_docker()
127                 return
128
129             docker_image = None
130             if self.override_docker_image:
131                 docker_image = self.override_docker_image
132             else:
133                 try:
134                     with llfuse.lock:
135                         if "docker_image" in self.cdir:
136                             docker_image = self.cdir["docker_image"].readfrom(0, 1024).strip()
137                 except IOError as e:
138                     pass
139
140             has_reload = False
141             try:
142                 with llfuse.lock:
143                     has_reload = "reload" in self.cdir
144             except IOError as e:
145                 pass
146
147             if docker_image is None:
148                 logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
149                 self.stop_docker()
150                 return
151
152             if docker_image == self.prev_docker_image and self.cid is not None and has_reload:
153                 logger.info("Running container reload command")
154                 subprocess.check_call(["docker", "exec", self.cid, "/mnt/reload"])
155                 return
156
157             self.stop_docker()
158
159             logger.info("Starting Docker container %s", docker_image)
160             self.cid = subprocess.check_output(["docker", "run",
161                                                 "--detach=true",
162                                                 "--publish=%i:80" % (self.port),
163                                                 "--volume=%s:/mnt:ro" % self.mountdir,
164                                                 docker_image]).strip()
165
166             self.prev_docker_image = docker_image
167             logger.info("Container id %s", self.cid)
168
169         except subprocess.CalledProcessError:
170             self.cid = None
171
172     def wait_for_events(self):
173         if not self.cid:
174             logger.warning("No service running!  Will wait for a new collection to appear in the project.")
175         else:
176             logger.info("Waiting for events")
177
178         running = True
179         self.loop = True
180         while running:
181             # Main run loop.  Wait on project events, signals, or the
182             # Docker container stopping.
183
184             try:
185                 # Poll the queue with a 1 second timeout, if we have no
186                 # timeout the Python runtime doesn't have a chance to
187                 # process SIGINT or SIGTERM.
188                 eq = self.evqueue.get(True, 1)
189                 logger.info("%s %s", eq[1], eq[2])
190                 self.newcollection = self.collection
191                 if eq[1] in ('add', 'update', 'create'):
192                     self.newcollection = eq[2]
193                 elif eq[1] == 'remove':
194                     collections = self.api.collections().list(filters=[["owner_uuid", "=", self.project]],
195                                                         limit=1,
196                                                         order='modified_at desc').execute()['items']
197                     self.newcollection = collections[0]['uuid'] if collections else None
198                 running = False
199             except Queue.Empty:
200                 pass
201
202             if self.cid and not self.check_docker_running():
203                 logger.warning("Service has terminated.  Will try to restart.")
204                 self.cid = None
205                 running = False
206
207
208     def run(self):
209         try:
210             while self.loop:
211                 self.loop = False
212                 self.mount_collection()
213                 try:
214                     self.run_docker()
215                     self.wait_for_events()
216                 except (KeyboardInterrupt):
217                     logger.info("Got keyboard interrupt")
218                     self.ws.close()
219                     self.loop = False
220                 except Exception as e:
221                     logger.exception("Caught fatal exception, shutting down")
222                     self.ws.close()
223                     self.loop = False
224         finally:
225             self.stop_docker()
226
227             if self.mountdir:
228                 logger.info("Unmounting")
229                 subprocess.call(["fusermount", "-u", self.mountdir])
230                 os.rmdir(self.mountdir)
231
232
233 def main(argv):
234     parser = argparse.ArgumentParser()
235     parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch")
236     parser.add_argument('--port', type=int, default=8080, help="Host port to listen on (default 8080)")
237     parser.add_argument('--image', type=str, help="Docker image to run")
238
239     args = parser.parse_args(argv)
240
241     signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
242
243     try:
244         arvweb = ArvWeb(args.project_uuid, args.image, args.port)
245         arvweb.run()
246     except arvados.errors.ArgumentError as e:
247         logger.error(e)
248         return 1
249
250     return 0
251
252 if __name__ == '__main__':
253     sys.exit(main(sys.argv[1:]))