# See http://doc.arvados.org/user/topics/arv-web.html
import arvados
+from arvados.safeapi import ThreadSafeApiCache
import subprocess
-from arvados_fuse import Operations, SafeApi, CollectionDirectory
+from arvados_fuse import Operations, CollectionDirectory
import tempfile
import os
import llfuse
self.project = project
self.loop = True
self.cid = None
- self.docker_proc = None
self.prev_docker_image = None
self.mountdir = None
self.collection = None
self.override_docker_image = docker_image
self.port = port
self.evqueue = Queue.Queue()
- self.api = SafeApi(arvados.config)
+ self.api = ThreadSafeApiCache(arvados.config.settings())
- if arvados.util.group_uuid_patternmatch(project) is None:
+ if arvados.util.group_uuid_pattern.match(project) is None:
raise arvados.errors.ArgumentError("Project uuid is not valid")
- collections = api.collections().list(filters=[["owner_uuid", "=", project]],
+ collections = self.api.collections().list(filters=[["owner_uuid", "=", project]],
limit=1,
order='modified_at desc').execute()['items']
- self.newcollection = collections[0]['uuid'] if len(collections) > 0 else None
+ self.newcollection = collections[0]['uuid'] if collections else None
- self.ws = arvados.events.subscribe(api, [["object_uuid", "is_a", "arvados#collection"]], self.on_message)
+ self.ws = arvados.events.subscribe(self.api, [["object_uuid", "is_a", "arvados#collection"]], self.on_message)
+
+ def check_docker_running(self):
+ # It would be less hacky to use "docker events" than poll "docker ps"
+ # but that would require writing a bigger pile of code.
+ if self.cid:
+ ps = subprocess.check_output(["docker", "ps", "--no-trunc=true", "--filter=status=running"])
+ for l in ps.splitlines():
+ if l.startswith(self.cid):
+ return True
+ return False
# Handle messages from Arvados event bus.
def on_message(self, ev):
def run_fuse_mount(self):
self.mountdir = tempfile.mkdtemp()
- self.operations = Operations(os.getuid(), os.getgid(), "utf-8")
- self.cdir = CollectionDirectory(llfuse.ROOT_INODE, self.operations.inodes, api, 2, self.collection)
- self.operations.inodes.add_entry(cdir)
+ self.operations = Operations(os.getuid(), os.getgid(), self.api, "utf-8")
+ self.cdir = CollectionDirectory(llfuse.ROOT_INODE, self.operations.inodes, self.api, 2, self.collection)
+ self.operations.inodes.add_entry(self.cdir)
# Initialize the fuse connection
- llfuse.init(operations, mountdir, ['allow_other'])
+ llfuse.init(self.operations, self.mountdir, ['allow_other'])
t = threading.Thread(None, llfuse.main)
t.start()
if self.mountdir:
with llfuse.lock:
self.cdir.clear()
+ # Switch the FUSE directory object so that it stores
+ # the newly selected collection
if self.collection:
- # Switch the FUSE directory object so that it stores
- # the newly selected collection
logger.info("Mounting %s", self.collection)
- cdir.change_collection(self.collection)
+ else:
+ logger.info("Mount is empty")
+ self.cdir.change_collection(self.collection)
+
def stop_docker(self):
if self.cid:
logger.info("Stopping Docker container")
- subprocess.check_call(["docker", "stop", cid])
+ subprocess.call(["docker", "stop", self.cid])
self.cid = None
- self.docker_proc = None
def run_docker(self):
try:
if docker_image == self.prev_docker_image and self.cid is not None and has_reload:
logger.info("Running container reload command")
- subprocess.check_call(["docker", "exec", cid, "/mnt/reload"])
+ subprocess.check_call(["docker", "exec", self.cid, "/mnt/reload"])
return
self.stop_docker()
logger.info("Starting Docker container %s", docker_image)
- ciddir = tempfile.mkdtemp()
- cidfilepath = os.path.join(ciddir, "cidfile")
- self.docker_proc = subprocess.Popen(["docker", "run",
- "--cidfile=%s" % (cidfilepath),
- "--publish=%i:80" % (self.port),
- "--volume=%s:/mnt:ro" % self.mountdir,
- docker_image])
- self.cid = None
- while self.cid is None and self.docker_proc.poll() is None:
- try:
- with open(cidfilepath) as cidfile:
- self.cid = cidfile.read().strip()
- except IOError as e:
- # XXX check for ENOENT
- pass
-
- try:
- if os.path.exists(cidfilepath):
- os.unlink(cidfilepath)
- os.rmdir(ciddir)
- except OSError:
- pass
+ self.cid = subprocess.check_output(["docker", "run",
+ "--detach=true",
+ "--publish=%i:80" % (self.port),
+ "--volume=%s:/mnt:ro" % self.mountdir,
+ docker_image]).strip()
self.prev_docker_image = docker_image
logger.info("Container id %s", self.cid)
if eq[1] in ('add', 'update', 'create'):
self.newcollection = eq[2]
elif eq[1] == 'remove':
- collections = api.collections().list(filters=[["owner_uuid", "=", project]],
+ collections = self.api.collections().list(filters=[["owner_uuid", "=", self.project]],
limit=1,
order='modified_at desc').execute()['items']
- self.newcollection = collections[0]['uuid'] if len(collections) > 0 else None
+ self.newcollection = collections[0]['uuid'] if collections else None
running = False
except Queue.Empty:
pass
- if self.docker_proc and self.docker_proc.poll() is not None:
+ if self.cid and not self.check_docker_running():
logger.warning("Service has terminated. Will try to restart.")
self.cid = None
- self.docker_proc = None
running = False
self.ws.close()
self.loop = False
finally:
- if self.cid:
- logger.info("Stopping docker container")
- subprocess.call(["docker", "stop", self.cid])
+ self.stop_docker()
if self.mountdir:
logger.info("Unmounting")
signal.signal(signal.SIGTERM, lambda signal, frame: sys.exit(0))
try:
- arvweb = ArvWeb(args.project_uuid, args.image, args.ports)
+ arvweb = ArvWeb(args.project_uuid, args.image, args.port)
arvweb.run()
except arvados.errors.ArgumentError as e:
logger.error(e)
+ return 1
+
+ return 0
if __name__ == '__main__':
- main(sys.argv[1:])
+ sys.exit(main(sys.argv[1:]))