4904: Eliminate downtime switching collections.
[arvados.git] / services / arv-web / arv-web.py
1 #!/usr/bin/env python
2
3 import arvados
4 import subprocess
5 from arvados_fuse import Operations, SafeApi, CollectionDirectory
6 import tempfile
7 import os
8 import llfuse
9 import threading
10 import Queue
11 import argparse
12 import logging
13 import signal
14 import sys
15 import functools
16
17 def run_fuse_mount(api, collection):
18     mountdir = tempfile.mkdtemp()
19
20     operations = Operations(os.getuid(), os.getgid(), "utf-8")
21     cdir = CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, 2, collection)
22     operations.inodes.add_entry(cdir)
23
24     # Initialize the fuse connection
25     llfuse.init(operations, mountdir, ['allow_other'])
26
27     t = threading.Thread(None, llfuse.main)
28     t.start()
29
30     # wait until the driver is finished initializing
31     operations.initlock.wait()
32
33     return (mountdir, cdir)
34
35 def on_message(project, evqueue, ev):
36     if 'event_type' in ev:
37         old_attr = None
38         if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
39             old_attr = ev['properties']['old_attributes']
40         if project not in (ev['properties']['new_attributes']['owner_uuid'],
41                                 old_attr['owner_uuid'] if old_attr else None):
42             return
43
44         et = ev['event_type']
45         if ev['event_type'] == 'update':
46             if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
47                 if args.project_uuid == ev['properties']['new_attributes']['owner_uuid']:
48                     et = 'add'
49                 else:
50                     et = 'remove'
51             if ev['properties']['new_attributes']['expires_at'] is not None:
52                 et = 'remove'
53
54         evqueue.put((project, et, ev['object_uuid']))
55
56 def main(argv):
57     logger = logging.getLogger('arvados.arv-web')
58     logger.setLevel(logging.INFO)
59
60     parser = argparse.ArgumentParser()
61     parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch")
62     parser.add_argument('--port', type=int, default=8080, help="Port to listen on (default 8080)")
63     parser.add_argument('--image', type=str, help="Docker image to run")
64
65     args = parser.parse_args(argv)
66
67     api = SafeApi(arvados.config)
68     project = args.project_uuid
69     docker_image = args.image
70     port = args.port
71     evqueue = Queue.Queue()
72
73     collections = api.collections().list(filters=[["owner_uuid", "=", project]],
74                         limit=1,
75                         order='modified_at desc').execute()['items']
76     newcollection = collections[0]['uuid'] if len(collections) > 0 else None
77     collection = None
78
79     ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], functools.partial(on_message, project, evqueue))
80
81     signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
82
83     loop = True
84     cid = None
85     prev_docker_image = None
86     mountdir = None
87
88     try:
89         while loop:
90             loop = False
91             if newcollection != collection:
92                 collection = newcollection
93                 if not mountdir:
94                     (mountdir, cdir) = run_fuse_mount(api, collection)
95
96                 with llfuse.lock:
97                     cdir.clear()
98                     if collection:
99                         logger.info("Mounting %s", collection)
100                         cdir.collection_locator = collection
101                         cdir.collection_object = None
102                         cdir.clear()
103
104             try:
105                 try:
106                     if collection:
107                         if not args.image:
108                             docker_image = None
109                             while not docker_image and os.path.exists(os.path.join(mountdir, "docker_image")):
110                                 try:
111                                     with open(os.path.join(mountdir, "docker_image")) as di:
112                                         docker_image = di.read().strip()
113                                 except IOError as e:
114                                     pass
115
116                         if not docker_image:
117                             logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
118
119                         if docker_image != prev_docker_image:
120                             if cid:
121                                 logger.info("Stopping docker container")
122                                 subprocess.check_call(["docker", "stop", cid])
123
124                             if docker_image:
125                                 logger.info("Starting docker container %s", docker_image)
126                                 cid = subprocess.check_output(["docker", "run",
127                                                                "--detach=true",
128                                                                "--publish=%i:80" % (port),
129                                                                "--volume=%s:/mnt:ro" % mountdir,
130                                                                docker_image])
131                                 cid = cid.rstrip()
132                                 prev_docker_image = docker_image
133                                 logger.info("Container id is %s", cid)
134                         elif cid:
135                             subprocess.check_call(["docker", "kill", "--signal=HUP", cid])
136                     elif cid:
137                         logger.info("Stopping docker container")
138                         subprocess.call(["docker", "stop", cid])
139                 except subprocess.CalledProcessError:
140                     cid = None
141
142                 if not cid:
143                     logger.warning("No service running!  Will wait for a new collection to appear in the project.")
144                 else:
145                     logger.info("Waiting for events")
146                 running = True
147                 loop = True
148                 while running:
149                     try:
150                         eq = evqueue.get(True, 1)
151                         logger.info("%s %s", eq[1], eq[2])
152                         newcollection = collection
153                         if eq[1] in ('add', 'update', 'create'):
154                             newcollection = eq[2]
155                         elif eq[1] == 'remove':
156                             collections = api.collections().list(filters=[["owner_uuid", "=", project]],
157                                                                 limit=1,
158                                                                 order='modified_at desc').execute()['items']
159                             newcollection = collections[0]['uuid'] if len(collections) > 0 else None
160                         running = False
161                     except Queue.Empty:
162                         pass
163
164             except (KeyboardInterrupt):
165                 logger.info("Got keyboard interrupt")
166                 ws.close()
167                 loop = False
168             except Exception as e:
169                 logger.exception(e)
170                 ws.close()
171                 loop = False
172     finally:
173         if cid:
174             logger.info("Stopping docker container")
175             cid = subprocess.call(["docker", "stop", cid])
176
177         if mountdir:
178             logger.info("Unmounting")
179             subprocess.call(["fusermount", "-u", "-z", mountdir])
180             os.rmdir(mountdir)
181
182 if __name__ == '__main__':
183     main(sys.argv[1:])