Merge branch '3408-production-datamanager' refs #3408
[arvados.git] / services / arv-web / arv-web.py
1 #!/usr/bin/env python
2
3 # arv-web enables you to run a custom web service from the contents of an Arvados collection.
4 #
5 # See http://doc.arvados.org/user/topics/arv-web.html
6
7 import arvados
8 import subprocess
9 from arvados_fuse import Operations, SafeApi, CollectionDirectory
10 import tempfile
11 import os
12 import llfuse
13 import threading
14 import Queue
15 import argparse
16 import logging
17 import signal
18 import sys
19 import functools
20
21 logger = logging.getLogger('arvados.arv-web')
22 logger.setLevel(logging.INFO)
23
24 class ArvWeb(object):
25     def __init__(self, project, docker_image, port):
26         self.project = project
27         self.loop = True
28         self.cid = None
29         self.prev_docker_image = None
30         self.mountdir = None
31         self.collection = None
32         self.override_docker_image = docker_image
33         self.port = port
34         self.evqueue = Queue.Queue()
35         self.api = SafeApi(arvados.config)
36
37         if arvados.util.group_uuid_pattern.match(project) is None:
38             raise arvados.errors.ArgumentError("Project uuid is not valid")
39
40         collections = self.api.collections().list(filters=[["owner_uuid", "=", project]],
41                         limit=1,
42                         order='modified_at desc').execute()['items']
43         self.newcollection = collections[0]['uuid'] if collections else None
44
45         self.ws = arvados.events.subscribe(self.api, [["object_uuid", "is_a", "arvados#collection"]], self.on_message)
46
47     def check_docker_running(self):
48         # It would be less hacky to use "docker events" than poll "docker ps"
49         # but that would require writing a bigger pile of code.
50         if self.cid:
51             ps = subprocess.check_output(["docker", "ps", "--no-trunc=true", "--filter=status=running"])
52             for l in ps.splitlines():
53                 if l.startswith(self.cid):
54                     return True
55         return False
56
57     # Handle messages from Arvados event bus.
58     def on_message(self, ev):
59         if 'event_type' in ev:
60             old_attr = None
61             if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
62                 old_attr = ev['properties']['old_attributes']
63             if self.project not in (ev['properties']['new_attributes']['owner_uuid'],
64                                     old_attr['owner_uuid'] if old_attr else None):
65                 return
66
67             et = ev['event_type']
68             if ev['event_type'] == 'update':
69                 if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
70                     if self.project == ev['properties']['new_attributes']['owner_uuid']:
71                         et = 'add'
72                     else:
73                         et = 'remove'
74                 if ev['properties']['new_attributes']['expires_at'] is not None:
75                     et = 'remove'
76
77             self.evqueue.put((self.project, et, ev['object_uuid']))
78
79     # Run an arvados_fuse mount under the control of the local process.  This lets
80     # us switch out the contents of the directory without having to unmount and
81     # remount.
82     def run_fuse_mount(self):
83         self.mountdir = tempfile.mkdtemp()
84
85         self.operations = Operations(os.getuid(), os.getgid(), "utf-8")
86         self.cdir = CollectionDirectory(llfuse.ROOT_INODE, self.operations.inodes, self.api, 2, self.collection)
87         self.operations.inodes.add_entry(self.cdir)
88
89         # Initialize the fuse connection
90         llfuse.init(self.operations, self.mountdir, ['allow_other'])
91
92         t = threading.Thread(None, llfuse.main)
93         t.start()
94
95         # wait until the driver is finished initializing
96         self.operations.initlock.wait()
97
98     def mount_collection(self):
99         if self.newcollection != self.collection:
100             self.collection = self.newcollection
101             if not self.mountdir and self.collection:
102                 self.run_fuse_mount()
103
104             if self.mountdir:
105                 with llfuse.lock:
106                     self.cdir.clear()
107                     # Switch the FUSE directory object so that it stores
108                     # the newly selected collection
109                     if self.collection:
110                         logger.info("Mounting %s", self.collection)
111                     else:
112                         logger.info("Mount is empty")
113                     self.cdir.change_collection(self.collection)
114
115
116     def stop_docker(self):
117         if self.cid:
118             logger.info("Stopping Docker container")
119             subprocess.call(["docker", "stop", self.cid])
120             self.cid = None
121
122     def run_docker(self):
123         try:
124             if self.collection is None:
125                 self.stop_docker()
126                 return
127
128             docker_image = None
129             if self.override_docker_image:
130                 docker_image = self.override_docker_image
131             else:
132                 try:
133                     with llfuse.lock:
134                         if "docker_image" in self.cdir:
135                             docker_image = self.cdir["docker_image"].readfrom(0, 1024).strip()
136                 except IOError as e:
137                     pass
138
139             has_reload = False
140             try:
141                 with llfuse.lock:
142                     has_reload = "reload" in self.cdir
143             except IOError as e:
144                 pass
145
146             if docker_image is None:
147                 logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
148                 self.stop_docker()
149                 return
150
151             if docker_image == self.prev_docker_image and self.cid is not None and has_reload:
152                 logger.info("Running container reload command")
153                 subprocess.check_call(["docker", "exec", self.cid, "/mnt/reload"])
154                 return
155
156             self.stop_docker()
157
158             logger.info("Starting Docker container %s", docker_image)
159             self.cid = subprocess.check_output(["docker", "run",
160                                                 "--detach=true",
161                                                 "--publish=%i:80" % (self.port),
162                                                 "--volume=%s:/mnt:ro" % self.mountdir,
163                                                 docker_image]).strip()
164
165             self.prev_docker_image = docker_image
166             logger.info("Container id %s", self.cid)
167
168         except subprocess.CalledProcessError:
169             self.cid = None
170
171     def wait_for_events(self):
172         if not self.cid:
173             logger.warning("No service running!  Will wait for a new collection to appear in the project.")
174         else:
175             logger.info("Waiting for events")
176
177         running = True
178         self.loop = True
179         while running:
180             # Main run loop.  Wait on project events, signals, or the
181             # Docker container stopping.
182
183             try:
184                 # Poll the queue with a 1 second timeout, if we have no
185                 # timeout the Python runtime doesn't have a chance to
186                 # process SIGINT or SIGTERM.
187                 eq = self.evqueue.get(True, 1)
188                 logger.info("%s %s", eq[1], eq[2])
189                 self.newcollection = self.collection
190                 if eq[1] in ('add', 'update', 'create'):
191                     self.newcollection = eq[2]
192                 elif eq[1] == 'remove':
193                     collections = self.api.collections().list(filters=[["owner_uuid", "=", self.project]],
194                                                         limit=1,
195                                                         order='modified_at desc').execute()['items']
196                     self.newcollection = collections[0]['uuid'] if collections else None
197                 running = False
198             except Queue.Empty:
199                 pass
200
201             if self.cid and not self.check_docker_running():
202                 logger.warning("Service has terminated.  Will try to restart.")
203                 self.cid = None
204                 running = False
205
206
207     def run(self):
208         try:
209             while self.loop:
210                 self.loop = False
211                 self.mount_collection()
212                 try:
213                     self.run_docker()
214                     self.wait_for_events()
215                 except (KeyboardInterrupt):
216                     logger.info("Got keyboard interrupt")
217                     self.ws.close()
218                     self.loop = False
219                 except Exception as e:
220                     logger.exception("Caught fatal exception, shutting down")
221                     self.ws.close()
222                     self.loop = False
223         finally:
224             self.stop_docker()
225
226             if self.mountdir:
227                 logger.info("Unmounting")
228                 subprocess.call(["fusermount", "-u", self.mountdir])
229                 os.rmdir(self.mountdir)
230
231
232 def main(argv):
233     parser = argparse.ArgumentParser()
234     parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch")
235     parser.add_argument('--port', type=int, default=8080, help="Host port to listen on (default 8080)")
236     parser.add_argument('--image', type=str, help="Docker image to run")
237
238     args = parser.parse_args(argv)
239
240     signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
241
242     try:
243         arvweb = ArvWeb(args.project_uuid, args.image, args.port)
244         arvweb.run()
245     except arvados.errors.ArgumentError as e:
246         logger.error(e)
247         return 1
248
249     return 0
250
251 if __name__ == '__main__':
252     sys.exit(main(sys.argv[1:]))