5 from arvados_fuse import Operations, SafeApi, CollectionDirectory
17 def run_fuse_mount(api, collection):
18 mountdir = tempfile.mkdtemp()
20 operations = Operations(os.getuid(), os.getgid(), "utf-8")
21 cdir = CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, 2, collection)
22 operations.inodes.add_entry(cdir)
24 # Initialize the fuse connection
25 llfuse.init(operations, mountdir, ['allow_other'])
27 t = threading.Thread(None, llfuse.main)
30 # wait until the driver is finished initializing
31 operations.initlock.wait()
33 return (mountdir, cdir)
35 def on_message(project, evqueue, ev):
36 if 'event_type' in ev:
38 if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
39 old_attr = ev['properties']['old_attributes']
40 if project not in (ev['properties']['new_attributes']['owner_uuid'],
41 old_attr['owner_uuid'] if old_attr else None):
45 if ev['event_type'] == 'update':
46 if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
47 if args.project_uuid == ev['properties']['new_attributes']['owner_uuid']:
51 if ev['properties']['new_attributes']['expires_at'] is not None:
54 evqueue.put((project, et, ev['object_uuid']))
57 logger = logging.getLogger('arvados.arv-web')
58 logger.setLevel(logging.INFO)
60 parser = argparse.ArgumentParser()
61 parser.add_argument('--project-uuid', type=str, required=True, help="Project uuid to watch")
62 parser.add_argument('--port', type=int, default=8080, help="Port to listen on (default 8080)")
63 parser.add_argument('--image', type=str, help="Docker image to run")
65 args = parser.parse_args(argv)
67 api = SafeApi(arvados.config)
68 project = args.project_uuid
69 docker_image = args.image
71 evqueue = Queue.Queue()
73 collections = api.collections().list(filters=[["owner_uuid", "=", project]],
75 order='modified_at desc').execute()['items']
76 newcollection = collections[0]['uuid'] if len(collections) > 0 else None
79 ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], functools.partial(on_message, project, evqueue))
81 signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
86 prev_docker_image = None
92 if newcollection != collection:
93 collection = newcollection
95 (mountdir, cdir) = run_fuse_mount(api, collection)
100 logger.info("Mounting %s", collection)
101 cdir.collection_locator = collection
102 cdir.collection_object = None
110 while not docker_image and os.path.exists(os.path.join(mountdir, "docker_image")):
112 with open(os.path.join(mountdir, "docker_image")) as di:
113 docker_image = di.read().strip()
118 logger.error("Collection must contain a file 'docker_image' or must specify --image on the command line.")
120 if docker_image and ((docker_image != prev_docker_image) or cid is None):
122 logger.info("Stopping docker container")
123 subprocess.check_call(["docker", "stop", cid])
128 logger.info("Starting docker container %s", docker_image)
129 ciddir = tempfile.mkdtemp()
130 cidfilepath = os.path.join(ciddir, "cidfile")
131 docker_proc = subprocess.Popen(["docker", "run",
132 "--cidfile=%s" % (cidfilepath),
133 "--publish=%i:80" % (port),
134 "--volume=%s:/mnt:ro" % mountdir,
137 while not cid and docker_proc.poll() is None:
139 with open(cidfilepath) as cidfile:
140 cid = cidfile.read().strip()
143 os.unlink(cidfilepath)
146 prev_docker_image = docker_image
147 logger.info("Container id %s", cid)
149 logger.info("Sending refresh signal to container")
150 subprocess.check_call(["docker", "exec", cid, "killall", "--regexp", ".*", "--signal", "HUP"])
152 logger.info("Stopping docker container")
153 subprocess.check_call(["docker", "stop", cid])
154 except subprocess.CalledProcessError:
157 logger.warning("No service running! Will wait for a new collection to appear in the project.")
159 logger.info("Waiting for events")
164 eq = evqueue.get(True, 1)
165 logger.info("%s %s", eq[1], eq[2])
166 newcollection = collection
167 if eq[1] in ('add', 'update', 'create'):
168 newcollection = eq[2]
169 elif eq[1] == 'remove':
170 collections = api.collections().list(filters=[["owner_uuid", "=", project]],
172 order='modified_at desc').execute()['items']
173 newcollection = collections[0]['uuid'] if len(collections) > 0 else None
177 if docker_proc and docker_proc.poll() is not None:
178 logger.warning("Service has terminated unexpectedly, restarting.")
183 except (KeyboardInterrupt):
184 logger.info("Got keyboard interrupt")
187 except Exception as e:
193 logger.info("Stopping docker container")
194 subprocess.check_call(["docker", "stop", cid])
197 logger.info("Unmounting")
198 subprocess.call(["fusermount", "-u", "-z", mountdir])
201 if __name__ == '__main__':