4904: Rename to arv-web.py to reflect a more general purpose tool.
[arvados.git] / arv-web.py
1 import arvados
2 import subprocess
3 from arvados_fuse import Operations, SafeApi, CollectionDirectory
4 import tempfile
5 import os
6 import llfuse
7 import threading
8 import Queue
9 import argparse
10 import logging
11 import signal
12 import sys
13
14 logging.basicConfig(level=logging.INFO)
15
16 parser = argparse.ArgumentParser()
17 parser.add_argument('--project', type=str, required=True, help="Project to watch")
18 parser.add_argument('--port', type=int, default=8080, help="Local bind port")
19 parser.add_argument('--image', type=str, required=True, help="Docker image to run")
20
21 args = parser.parse_args()
22
23 api = SafeApi(arvados.config)
24 project = args.project
25 docker_image = args.image
26 port = args.port
27 evqueue = Queue.Queue()
28
29 def run_fuse_mount(collection):
30     global api
31
32     mountdir = tempfile.mkdtemp()
33
34     operations = Operations(os.getuid(), os.getgid(), "utf-8", True)
35     operations.inodes.add_entry(CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, 2, collection))
36
37     # Initialize the fuse connection
38     llfuse.init(operations, mountdir, ['allow_other'])
39
40     t = threading.Thread(None, lambda: llfuse.main())
41     t.start()
42
43     # wait until the driver is finished initializing
44     operations.initlock.wait()
45
46     return mountdir
47
48 def on_message(ev):
49     global project
50     global evqueue
51
52     if 'event_type' in ev:
53         old_attr = None
54         if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
55             old_attr = ev['properties']['old_attributes']
56         if project not in (ev['properties']['new_attributes']['owner_uuid'],
57                                 old_attr['owner_uuid'] if old_attr else None):
58             return
59
60         et = ev['event_type']
61         if ev['event_type'] == 'update' and ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
62             if args.project == ev['properties']['new_attributes']['owner_uuid']:
63                 et = 'add'
64             else:
65                 et = 'remove'
66
67         evqueue.put((project, et, ev['object_uuid']))
68
69 collection = api.collections().list(filters=[["owner_uuid", "=", project]],
70                     limit=1,
71                     order='modified_at desc').execute()['items'][0]['uuid']
72
73 ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], on_message)
74
75 signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
76
77 loop = True
78 cid = None
79 while loop:
80     logging.info("Mounting %s" % collection)
81     mountdir = run_fuse_mount(collection)
82     try:
83         logging.info("Starting docker container")
84         cid = subprocess.check_output(["docker", "run",
85                                        "--detach=true",
86                                        "--publish=%i:80" % (port),
87                                        "--volume=%s:/mnt:ro" % mountdir,
88                                        docker_image])
89         cid = cid.rstrip()
90         logging.info("Container id is %s" % cid)
91
92         logging.info("Waiting for events")
93         running = True
94         while running:
95             try:
96                 eq = evqueue.get(True, 1)
97                 logging.info("%s %s" % (eq[1], eq[2]))
98                 newcollection = collection
99                 if eq[1] in ('add', 'update', 'create'):
100                     newcollection = eq[2]
101                 elif eq[1] == 'remove':
102                     newcollection = api.collections().list(filters=[["owner_uuid", "=", project]],
103                                                         limit=1,
104                                                         order='modified_at desc').execute()['items'][0]['uuid']
105                 if newcollection != collection:
106                     logging.info("restarting web service")
107                     collection = newcollection
108                     running = False
109             except Queue.Empty:
110                 pass
111     except (KeyboardInterrupt):
112         logging.info("Got keyboard interrupt")
113         ws.close()
114         loop = False
115     except Exception as e:
116         logging.exception(str(e))
117         ws.close()
118         loop = False
119     finally:
120         if cid:
121             logging.info("Stopping docker container")
122             cid = subprocess.call(["docker", "stop", cid])
123
124         logging.info("Unmounting")
125         subprocess.call(["fusermount", "-u", "-z", mountdir])
126         os.rmdir(mountdir)