3 from arvados_fuse import Operations, SafeApi, CollectionDirectory
14 logging.basicConfig(level=logging.INFO)
16 parser = argparse.ArgumentParser()
17 parser.add_argument('--project', type=str, required=True, help="Project to watch")
18 parser.add_argument('--port', type=int, default=8080, help="Local bind port")
19 parser.add_argument('--image', type=str, required=True, help="Docker image to run")
21 args = parser.parse_args()
23 api = SafeApi(arvados.config)
24 project = args.project
25 docker_image = args.image
27 evqueue = Queue.Queue()
29 def run_fuse_mount(collection):
32 mountdir = tempfile.mkdtemp()
34 operations = Operations(os.getuid(), os.getgid(), "utf-8", True)
35 operations.inodes.add_entry(CollectionDirectory(llfuse.ROOT_INODE, operations.inodes, api, 2, collection))
37 # Initialize the fuse connection
38 llfuse.init(operations, mountdir, ['allow_other'])
40 t = threading.Thread(None, lambda: llfuse.main())
43 # wait until the driver is finished initializing
44 operations.initlock.wait()
52 if 'event_type' in ev:
54 if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
55 old_attr = ev['properties']['old_attributes']
56 if project not in (ev['properties']['new_attributes']['owner_uuid'],
57 old_attr['owner_uuid'] if old_attr else None):
61 if ev['event_type'] == 'update' and ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
62 if args.project == ev['properties']['new_attributes']['owner_uuid']:
67 evqueue.put((project, et, ev['object_uuid']))
69 collection = api.collections().list(filters=[["owner_uuid", "=", project]],
71 order='modified_at desc').execute()['items'][0]['uuid']
73 ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], on_message)
75 signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
80 logging.info("Mounting %s" % collection)
81 mountdir = run_fuse_mount(collection)
83 logging.info("Starting docker container")
84 cid = subprocess.check_output(["docker", "run",
86 "--publish=%i:80" % (port),
87 "--volume=%s:/mnt:ro" % mountdir,
90 logging.info("Container id is %s" % cid)
92 logging.info("Waiting for events")
96 eq = evqueue.get(True, 1)
97 logging.info("%s %s" % (eq[1], eq[2]))
98 newcollection = collection
99 if eq[1] in ('add', 'update', 'create'):
100 newcollection = eq[2]
101 elif eq[1] == 'remove':
102 newcollection = api.collections().list(filters=[["owner_uuid", "=", project]],
104 order='modified_at desc').execute()['items'][0]['uuid']
105 if newcollection != collection:
106 logging.info("restarting web service")
107 collection = newcollection
111 except (KeyboardInterrupt):
112 logging.info("Got keyboard interrupt")
115 except Exception as e:
116 logging.exception(str(e))
121 logging.info("Stopping docker container")
122 cid = subprocess.call(["docker", "stop", cid])
124 logging.info("Unmounting")
125 subprocess.call(["fusermount", "-u", "-z", mountdir])