5 from arvados_fuse import Operations, SafeApi, CollectionDirectory
17 def run_fuse_mount(api, collection):
18 mountdir = tempfile.mkdtemp()
20 operations = Operations(os.getuid(), os.getgid(), "utf-8")
21 cdir = CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, 2, collection)
22 operations.inodes.add_entry(cdir)
24 # Initialize the fuse connection
25 llfuse.init(operations, mountdir, ['allow_other'])
27 t = threading.Thread(None, llfuse.main)
30 # wait until the driver is finished initializing
31 operations.initlock.wait()
33 return (mountdir, cdir)
35 def on_message(project, evqueue, ev):
36 if 'event_type' in ev:
38 if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
39 old_attr = ev['properties']['old_attributes']
40 if project not in (ev['properties']['new_attributes']['owner_uuid'],
41 old_attr['owner_uuid'] if old_attr else None):
45 if ev['event_type'] == 'update':
46 if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
47 if args.project_uuid == ev['properties']['new_attributes']['owner_uuid']:
51 if ev['properties']['new_attributes']['expires_at'] is not None:
54 evqueue.put((project, et, ev['object_uuid']))
57 logger = logging.getLogger('arvados.arv-web')
58 logger.setLevel(logging.INFO)
60 parser = argparse.ArgumentParser()
61 parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch")
62 parser.add_argument('--port', type=int, default=8080, help="Port to listen on (default 8080)")
63 parser.add_argument('--image', type=str, help="Docker image to run")
65 args = parser.parse_args(argv)
67 api = SafeApi(arvados.config)
68 project = args.project_uuid
69 docker_image = args.image
71 evqueue = Queue.Queue()
73 collections = api.collections().list(filters=[["owner_uuid", "=", project]],
75 order='modified_at desc').execute()['items']
76 newcollection = collections[0]['uuid'] if len(collections) > 0 else None
79 ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], functools.partial(on_message, project, evqueue))
81 signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
85 prev_docker_image = None
91 if newcollection != collection:
92 collection = newcollection
94 (mountdir, cdir) = run_fuse_mount(api, collection)
99 logger.info("Mounting %s", collection)
100 cdir.collection_locator = collection
101 cdir.collection_object = None
109 while not docker_image and os.path.exists(os.path.join(mountdir, "docker_image")):
111 with open(os.path.join(mountdir, "docker_image")) as di:
112 docker_image = di.read().strip()
117 logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
119 if docker_image != prev_docker_image:
121 logger.info("Stopping docker container")
122 subprocess.check_call(["docker", "stop", cid])
125 logger.info("Starting docker container %s", docker_image)
126 cid = subprocess.check_output(["docker", "run",
128 "--publish=%i:80" % (port),
129 "--volume=%s:/mnt:ro" % mountdir,
132 prev_docker_image = docker_image
133 logger.info("Container id is %s", cid)
135 subprocess.check_call(["docker", "kill", "--signal=HUP", cid])
137 logger.info("Stopping docker container")
138 subprocess.call(["docker", "stop", cid])
139 except subprocess.CalledProcessError:
143 logger.warning("No service running! Will wait for a new collection to appear in the project.")
145 logger.info("Waiting for events")
150 eq = evqueue.get(True, 1)
151 logger.info("%s %s", eq[1], eq[2])
152 newcollection = collection
153 if eq[1] in ('add', 'update', 'create'):
154 newcollection = eq[2]
155 elif eq[1] == 'remove':
156 collections = api.collections().list(filters=[["owner_uuid", "=", project]],
158 order='modified_at desc').execute()['items']
159 newcollection = collections[0]['uuid'] if len(collections) > 0 else None
164 except (KeyboardInterrupt):
165 logger.info("Got keyboard interrupt")
168 except Exception as e:
174 logger.info("Stopping docker container")
175 cid = subprocess.call(["docker", "stop", cid])
178 logger.info("Unmounting")
179 subprocess.call(["fusermount", "-u", "-z", mountdir])
182 if __name__ == '__main__':