Merge branch '14886-acr-recursion-runtime-error'
[arvados.git] / services / arv-web / arv-web.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 # arv-web enables you to run a custom web service from the contents of an Arvados collection.
7 #
8 # See http://doc.arvados.org/user/topics/arv-web.html
9
10 import arvados
11 from arvados.safeapi import ThreadSafeApiCache
12 import subprocess
13 from arvados_fuse import Operations, CollectionDirectory
14 import tempfile
15 import os
16 import llfuse
17 import threading
18 import Queue
19 import argparse
20 import logging
21 import signal
22 import sys
23 import functools
24
25 logger = logging.getLogger('arvados.arv-web')
26 logger.setLevel(logging.INFO)
27
28 class ArvWeb(object):
29     def __init__(self, project, docker_image, port):
30         self.project = project
31         self.loop = True
32         self.cid = None
33         self.prev_docker_image = None
34         self.mountdir = None
35         self.collection = None
36         self.override_docker_image = docker_image
37         self.port = port
38         self.evqueue = Queue.Queue()
39         self.api = ThreadSafeApiCache(arvados.config.settings())
40
41         if arvados.util.group_uuid_pattern.match(project) is None:
42             raise arvados.errors.ArgumentError("Project uuid is not valid")
43
44         collections = self.api.collections().list(filters=[["owner_uuid", "=", project]],
45                         limit=1,
46                         order='modified_at desc').execute()['items']
47         self.newcollection = collections[0]['uuid'] if collections else None
48
49         self.ws = arvados.events.subscribe(self.api, [["object_uuid", "is_a", "arvados#collection"]], self.on_message)
50
51     def check_docker_running(self):
52         # It would be less hacky to use "docker events" than poll "docker ps"
53         # but that would require writing a bigger pile of code.
54         if self.cid:
55             ps = subprocess.check_output(["docker", "ps", "--no-trunc=true", "--filter=status=running"])
56             for l in ps.splitlines():
57                 if l.startswith(self.cid):
58                     return True
59         return False
60
61     # Handle messages from Arvados event bus.
62     def on_message(self, ev):
63         if 'event_type' in ev:
64             old_attr = None
65             if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
66                 old_attr = ev['properties']['old_attributes']
67             if self.project not in (ev['properties']['new_attributes']['owner_uuid'],
68                                     old_attr['owner_uuid'] if old_attr else None):
69                 return
70
71             et = ev['event_type']
72             if ev['event_type'] == 'update':
73                 if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
74                     if self.project == ev['properties']['new_attributes']['owner_uuid']:
75                         et = 'add'
76                     else:
77                         et = 'remove'
78                 if ev['properties']['new_attributes']['trash_at'] is not None:
79                     et = 'remove'
80
81             self.evqueue.put((self.project, et, ev['object_uuid']))
82
83     # Run an arvados_fuse mount under the control of the local process.  This lets
84     # us switch out the contents of the directory without having to unmount and
85     # remount.
86     def run_fuse_mount(self):
87         self.mountdir = tempfile.mkdtemp()
88
89         self.operations = Operations(os.getuid(), os.getgid(), self.api, "utf-8")
90         self.cdir = CollectionDirectory(llfuse.ROOT_INODE, self.operations.inodes, self.api, 2, self.collection)
91         self.operations.inodes.add_entry(self.cdir)
92
93         # Initialize the fuse connection
94         llfuse.init(self.operations, self.mountdir, ['allow_other'])
95
96         t = threading.Thread(None, llfuse.main)
97         t.start()
98
99         # wait until the driver is finished initializing
100         self.operations.initlock.wait()
101
102     def mount_collection(self):
103         if self.newcollection != self.collection:
104             self.collection = self.newcollection
105             if not self.mountdir and self.collection:
106                 self.run_fuse_mount()
107
108             if self.mountdir:
109                 with llfuse.lock:
110                     self.cdir.clear()
111                     # Switch the FUSE directory object so that it stores
112                     # the newly selected collection
113                     if self.collection:
114                         logger.info("Mounting %s", self.collection)
115                     else:
116                         logger.info("Mount is empty")
117                     self.cdir.change_collection(self.collection)
118
119
120     def stop_docker(self):
121         if self.cid:
122             logger.info("Stopping Docker container")
123             subprocess.call(["docker", "stop", self.cid])
124             self.cid = None
125
126     def run_docker(self):
127         try:
128             if self.collection is None:
129                 self.stop_docker()
130                 return
131
132             docker_image = None
133             if self.override_docker_image:
134                 docker_image = self.override_docker_image
135             else:
136                 try:
137                     with llfuse.lock:
138                         if "docker_image" in self.cdir:
139                             docker_image = self.cdir["docker_image"].readfrom(0, 1024).strip()
140                 except IOError as e:
141                     pass
142
143             has_reload = False
144             try:
145                 with llfuse.lock:
146                     has_reload = "reload" in self.cdir
147             except IOError as e:
148                 pass
149
150             if docker_image is None:
151                 logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
152                 self.stop_docker()
153                 return
154
155             if docker_image == self.prev_docker_image and self.cid is not None and has_reload:
156                 logger.info("Running container reload command")
157                 subprocess.check_call(["docker", "exec", self.cid, "/mnt/reload"])
158                 return
159
160             self.stop_docker()
161
162             logger.info("Starting Docker container %s", docker_image)
163             self.cid = subprocess.check_output(["docker", "run",
164                                                 "--detach=true",
165                                                 "--publish=%i:80" % (self.port),
166                                                 "--volume=%s:/mnt:ro" % self.mountdir,
167                                                 docker_image]).strip()
168
169             self.prev_docker_image = docker_image
170             logger.info("Container id %s", self.cid)
171
172         except subprocess.CalledProcessError:
173             self.cid = None
174
175     def wait_for_events(self):
176         if not self.cid:
177             logger.warning("No service running!  Will wait for a new collection to appear in the project.")
178         else:
179             logger.info("Waiting for events")
180
181         running = True
182         self.loop = True
183         while running:
184             # Main run loop.  Wait on project events, signals, or the
185             # Docker container stopping.
186
187             try:
188                 # Poll the queue with a 1 second timeout, if we have no
189                 # timeout the Python runtime doesn't have a chance to
190                 # process SIGINT or SIGTERM.
191                 eq = self.evqueue.get(True, 1)
192                 logger.info("%s %s", eq[1], eq[2])
193                 self.newcollection = self.collection
194                 if eq[1] in ('add', 'update', 'create'):
195                     self.newcollection = eq[2]
196                 elif eq[1] == 'remove':
197                     collections = self.api.collections().list(filters=[["owner_uuid", "=", self.project]],
198                                                         limit=1,
199                                                         order='modified_at desc').execute()['items']
200                     self.newcollection = collections[0]['uuid'] if collections else None
201                 running = False
202             except Queue.Empty:
203                 pass
204
205             if self.cid and not self.check_docker_running():
206                 logger.warning("Service has terminated.  Will try to restart.")
207                 self.cid = None
208                 running = False
209
210
211     def run(self):
212         try:
213             while self.loop:
214                 self.loop = False
215                 self.mount_collection()
216                 try:
217                     self.run_docker()
218                     self.wait_for_events()
219                 except (KeyboardInterrupt):
220                     logger.info("Got keyboard interrupt")
221                     self.ws.close()
222                     self.loop = False
223                 except Exception as e:
224                     logger.exception("Caught fatal exception, shutting down")
225                     self.ws.close()
226                     self.loop = False
227         finally:
228             self.stop_docker()
229
230             if self.mountdir:
231                 logger.info("Unmounting")
232                 subprocess.call(["fusermount", "-u", self.mountdir])
233                 os.rmdir(self.mountdir)
234
235
236 def main(argv):
237     parser = argparse.ArgumentParser()
238     parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch")
239     parser.add_argument('--port', type=int, default=8080, help="Host port to listen on (default 8080)")
240     parser.add_argument('--image', type=str, help="Docker image to run")
241
242     args = parser.parse_args(argv)
243
244     signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
245
246     try:
247         arvweb = ArvWeb(args.project_uuid, args.image, args.port)
248         arvweb.run()
249     except arvados.errors.ArgumentError as e:
250         logger.error(e)
251         return 1
252
253     return 0
254
255 if __name__ == '__main__':
256     sys.exit(main(sys.argv[1:]))