+import functools
+
+logger = logging.getLogger('arvados.arv-web')
+logger.setLevel(logging.INFO)
+
+class ArvWeb(object):
+ def __init__(self, project, docker_image, port):
+ self.project = project
+ self.loop = True
+ self.cid = None
+ self.prev_docker_image = None
+ self.mountdir = None
+ self.collection = None
+ self.override_docker_image = docker_image
+ self.port = port
+ self.evqueue = Queue.Queue()
+ self.api = ThreadSafeApiCache(arvados.config.settings())
+
+ if arvados.util.group_uuid_pattern.match(project) is None:
+ raise arvados.errors.ArgumentError("Project uuid is not valid")
+
+ collections = self.api.collections().list(filters=[["owner_uuid", "=", project]],
+ limit=1,
+ order='modified_at desc').execute()['items']
+ self.newcollection = collections[0]['uuid'] if collections else None
+
+ self.ws = arvados.events.subscribe(self.api, [["object_uuid", "is_a", "arvados#collection"]], self.on_message)
+
+ def check_docker_running(self):
+ # It would be less hacky to use "docker events" than poll "docker ps"
+ # but that would require writing a bigger pile of code.
+ if self.cid:
+ ps = subprocess.check_output(["docker", "ps", "--no-trunc=true", "--filter=status=running"])
+ for l in ps.splitlines():
+ if l.startswith(self.cid):
+ return True
+ return False
+
+ # Handle messages from Arvados event bus.
+ def on_message(self, ev):
+ if 'event_type' in ev:
+ old_attr = None
+ if 'old_attributes' in ev['properties'] and ev['properties']['old_attributes']:
+ old_attr = ev['properties']['old_attributes']
+ if self.project not in (ev['properties']['new_attributes']['owner_uuid'],
+ old_attr['owner_uuid'] if old_attr else None):
+ return
+
+ et = ev['event_type']
+ if ev['event_type'] == 'update':
+ if ev['properties']['new_attributes']['owner_uuid'] != ev['properties']['old_attributes']['owner_uuid']:
+ if self.project == ev['properties']['new_attributes']['owner_uuid']:
+ et = 'add'
+ else:
+ et = 'remove'
+ if ev['properties']['new_attributes']['trash_at'] is not None:
+ et = 'remove'
+
+ self.evqueue.put((self.project, et, ev['object_uuid']))
+
+ # Run an arvados_fuse mount under the control of the local process. This lets
+ # us switch out the contents of the directory without having to unmount and
+ # remount.
+ def run_fuse_mount(self):
+ self.mountdir = tempfile.mkdtemp()
+
+ self.operations = Operations(os.getuid(), os.getgid(), self.api, "utf-8")
+ self.cdir = CollectionDirectory(llfuse.ROOT_INODE, self.operations.inodes, self.api, 2, self.collection)
+ self.operations.inodes.add_entry(self.cdir)
+
+ # Initialize the fuse connection
+ llfuse.init(self.operations, self.mountdir, ['allow_other'])
+
+ t = threading.Thread(None, llfuse.main)
+ t.start()
+
+ # wait until the driver is finished initializing
+ self.operations.initlock.wait()
+
+ def mount_collection(self):
+ if self.newcollection != self.collection:
+ self.collection = self.newcollection
+ if not self.mountdir and self.collection:
+ self.run_fuse_mount()
+
+ if self.mountdir:
+ with llfuse.lock:
+ self.cdir.clear()
+ # Switch the FUSE directory object so that it stores
+ # the newly selected collection
+ if self.collection:
+ logger.info("Mounting %s", self.collection)
+ else:
+ logger.info("Mount is empty")
+ self.cdir.change_collection(self.collection)
+
+
+ def stop_docker(self):
+ if self.cid:
+ logger.info("Stopping Docker container")
+ subprocess.call(["docker", "stop", self.cid])
+ self.cid = None
+
+ def run_docker(self):
+ try:
+ if self.collection is None:
+ self.stop_docker()
+ return
+
+ docker_image = None
+ if self.override_docker_image:
+ docker_image = self.override_docker_image
+ else:
+ try:
+ with llfuse.lock:
+ if "docker_image" in self.cdir:
+ docker_image = self.cdir["docker_image"].readfrom(0, 1024).strip()
+ except IOError as e:
+ pass
+
+ has_reload = False
+ try:
+ with llfuse.lock:
+ has_reload = "reload" in self.cdir
+ except IOError as e:
+ pass