129e0fb762b9df5171f1a97009a29f275b38f37d
[arvados.git] / services / arv-web / arv-web.py
1 #!/usr/bin/env python
2
3 import arvados
4 import subprocess
5 from arvados_fuse import Operations, SafeApi, CollectionDirectory
6 import tempfile
7 import os
8 import llfuse
9 import threading
10 import Queue
11 import argparse
12 import logging
13 import signal
14 import sys
15 import functools
16
17 def run_fuse_mount(api, collection):
18     mountdir = tempfile.mkdtemp()
19
20     operations = Operations(os.getuid(), os.getgid(), "utf-8")
21     cdir = CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, 2, collection)
22     operations.inodes.add_entry(cdir)
23
24     # Initialize the fuse connection
25     llfuse.init(operations, mountdir, ['allow_other'])
26
27     t = threading.Thread(None, llfuse.main)
28     t.start()
29
30     # wait until the driver is finished initializing
31     operations.initlock.wait()
32
33     return (mountdir, cdir)
34
35 def on_message(project, evqueue, ev):
36     if 'event_type' in ev:
37         old_attr = None
38         if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
39             old_attr = ev['properties']['old_attributes']
40         if project not in (ev['properties']['new_attributes']['owner_uuid'],
41                                 old_attr['owner_uuid'] if old_attr else None):
42             return
43
44         et = ev['event_type']
45         if ev['event_type'] == 'update':
46             if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
47                 if args.project_uuid == ev['properties']['new_attributes']['owner_uuid']:
48                     et = 'add'
49                 else:
50                     et = 'remove'
51             if ev['properties']['new_attributes']['expires_at'] is not None:
52                 et = 'remove'
53
54         evqueue.put((project, et, ev['object_uuid']))
55
56 def main(argv):
57     logger = logging.getLogger('arvados.arv-web')
58     logger.setLevel(logging.INFO)
59
60     parser = argparse.ArgumentParser()
61     parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch")
62     parser.add_argument('--port', type=int, default=8080, help="Port to listen on (default 8080)")
63     parser.add_argument('--image', type=str, help="Docker image to run")
64
65     args = parser.parse_args(argv)
66
67     api = SafeApi(arvados.config)
68     project = args.project_uuid
69     docker_image = args.image
70     port = args.port
71     evqueue = Queue.Queue()
72
73     collections = api.collections().list(filters=[["owner_uuid", "=", project]],
74                         limit=1,
75                         order='modified_at desc').execute()['items']
76     newcollection = collections[0]['uuid'] if len(collections) > 0 else None
77     collection = None
78
79     ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], functools.partial(on_message, project, evqueue))
80
81     signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
82
83     loop = True
84     cid = None
85     docker_proc = None
86     prev_docker_image = None
87     mountdir = None
88
89     try:
90         while loop:
91             loop = False
92             if newcollection != collection:
93                 collection = newcollection
94                 if not mountdir:
95                     (mountdir, cdir) = run_fuse_mount(api, collection)
96
97                 with llfuse.lock:
98                     cdir.clear()
99                     if collection:
100                         logger.info("Mounting %s", collection)
101                         cdir.collection_locator = collection
102                         cdir.collection_object = None
103                         cdir.update()
104
105             try:
106                 try:
107                     if collection:
108                         if not args.image:
109                             docker_image = None
110                             while not docker_image and os.path.exists(os.path.join(mountdir, "docker_image")):
111                                 try:
112                                     with open(os.path.join(mountdir, "docker_image")) as di:
113                                         docker_image = di.read().strip()
114                                 except IOError as e:
115                                     pass
116
117                         if not docker_image:
118                             logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
119
120                         if docker_image and ((docker_image != prev_docker_image) or cid is None):
121                             if cid:
122                                 logger.info("Stopping docker container")
123                                 subprocess.check_call(["docker", "stop", cid])
124                                 cid = None
125                                 docker_proc = None
126
127                             if docker_image:
128                                 logger.info("Starting docker container %s", docker_image)
129                                 ciddir = tempfile.mkdtemp()
130                                 cidfilepath = os.path.join(ciddir, "cidfile")
131                                 docker_proc = subprocess.Popen(["docker", "run",
132                                                                 "--cidfile=%s" % (cidfilepath),
133                                                                 "--publish=%i:80" % (port),
134                                                                 "--volume=%s:/mnt:ro" % mountdir,
135                                                                 docker_image])
136                                 cid = None
137                                 while not cid and docker_proc.poll() is None:
138                                     try:
139                                         with open(cidfilepath) as cidfile:
140                                             cid = cidfile.read().strip()
141                                     except IOError:
142                                         pass
143                                 os.unlink(cidfilepath)
144                                 os.rmdir(ciddir)
145
146                                 prev_docker_image = docker_image
147                                 logger.info("Container id %s", cid)
148                         elif cid:
149                             logger.info("Sending refresh signal to container")
150                             subprocess.check_call(["docker", "exec", cid, "killall", "--regexp", ".*", "--signal", "HUP"])
151                     elif cid:
152                         logger.info("Stopping docker container")
153                         subprocess.check_call(["docker", "stop", cid])
154                 except subprocess.CalledProcessError:
155                     cid = None
156                 if not cid:
157                     logger.warning("No service running!  Will wait for a new collection to appear in the project.")
158                 else:
159                     logger.info("Waiting for events")
160                 running = True
161                 loop = True
162                 while running:
163                     try:
164                         eq = evqueue.get(True, 1)
165                         logger.info("%s %s", eq[1], eq[2])
166                         newcollection = collection
167                         if eq[1] in ('add', 'update', 'create'):
168                             newcollection = eq[2]
169                         elif eq[1] == 'remove':
170                             collections = api.collections().list(filters=[["owner_uuid", "=", project]],
171                                                                 limit=1,
172                                                                 order='modified_at desc').execute()['items']
173                             newcollection = collections[0]['uuid'] if len(collections) > 0 else None
174                         running = False
175                     except Queue.Empty:
176                         pass
177                     if docker_proc and docker_proc.poll() is not None:
178                         logger.warning("Service has terminated unexpectedly, restarting.")
179                         cid = None
180                         docker_proc = None
181                         running = False
182
183             except (KeyboardInterrupt):
184                 logger.info("Got keyboard interrupt")
185                 ws.close()
186                 loop = False
187             except Exception as e:
188                 logger.exception(e)
189                 ws.close()
190                 loop = False
191     finally:
192         if cid:
193             logger.info("Stopping docker container")
194             subprocess.check_call(["docker", "stop", cid])
195
196         if mountdir:
197             logger.info("Unmounting")
198             subprocess.call(["fusermount", "-u", "-z", mountdir])
199             os.rmdir(mountdir)
200
201 if __name__ == '__main__':
202     main(sys.argv[1:])