Merge branch '10813-arv-put-six-threads'
[arvados.git] / sdk / python / tests / run_test_server.py
index b5536da77c072653105935df69a2e350d8ddecd5..b969b12a7abc6374ae0993d0d95c2f5cf85f784b 100644 (file)
@@ -1,7 +1,10 @@
 #!/usr/bin/env python
 
 #!/usr/bin/env python
 
+from __future__ import print_function
 import argparse
 import atexit
 import argparse
 import atexit
+import errno
+import glob
 import httplib2
 import os
 import pipes
 import httplib2
 import os
 import pipes
@@ -9,8 +12,9 @@ import random
 import re
 import shutil
 import signal
 import re
 import shutil
 import signal
-import subprocess
+import socket
 import string
 import string
+import subprocess
 import sys
 import tempfile
 import time
 import sys
 import tempfile
 import time
@@ -24,12 +28,11 @@ if __name__ == '__main__' and os.path.exists(
     # Add the Python SDK source to the library path.
     sys.path.insert(1, os.path.dirname(MY_DIRNAME))
 
     # Add the Python SDK source to the library path.
     sys.path.insert(1, os.path.dirname(MY_DIRNAME))
 
-import arvados.api
+import arvados
 import arvados.config
 
 ARVADOS_DIR = os.path.realpath(os.path.join(MY_DIRNAME, '../../..'))
 SERVICES_SRC_DIR = os.path.join(ARVADOS_DIR, 'services')
 import arvados.config
 
 ARVADOS_DIR = os.path.realpath(os.path.join(MY_DIRNAME, '../../..'))
 SERVICES_SRC_DIR = os.path.join(ARVADOS_DIR, 'services')
-SERVER_PID_PATH = 'tmp/pids/test-server.pid'
 if 'GOPATH' in os.environ:
     gopaths = os.environ['GOPATH'].split(':')
     gobins = [os.path.join(path, 'bin') for path in gopaths]
 if 'GOPATH' in os.environ:
     gopaths = os.environ['GOPATH'].split(':')
     gobins = [os.path.join(path, 'bin') for path in gopaths]
@@ -40,6 +43,8 @@ if not os.path.exists(TEST_TMPDIR):
     os.mkdir(TEST_TMPDIR)
 
 my_api_host = None
     os.mkdir(TEST_TMPDIR)
 
 my_api_host = None
+_cached_config = {}
+_cached_db_config = {}
 
 def find_server_pid(PID_PATH, wait=10):
     now = time.time()
 
 def find_server_pid(PID_PATH, wait=10):
     now = time.time()
@@ -51,9 +56,7 @@ def find_server_pid(PID_PATH, wait=10):
             with open(PID_PATH, 'r') as f:
                 server_pid = int(f.read())
             good_pid = (os.kill(server_pid, 0) is None)
             with open(PID_PATH, 'r') as f:
                 server_pid = int(f.read())
             good_pid = (os.kill(server_pid, 0) is None)
-        except IOError:
-            good_pid = False
-        except OSError:
+        except EnvironmentError:
             good_pid = False
         now = time.time()
 
             good_pid = False
         now = time.time()
 
@@ -68,56 +71,133 @@ def kill_server_pid(pidfile, wait=10, passenger_root=False):
     import signal
     import subprocess
     import time
     import signal
     import subprocess
     import time
-    try:
-        if passenger_root:
-            # First try to shut down nicely
-            restore_cwd = os.getcwd()
-            os.chdir(passenger_root)
-            subprocess.call([
-                'bundle', 'exec', 'passenger', 'stop', '--pid-file', pidfile])
-            os.chdir(restore_cwd)
-        now = time.time()
-        timeout = now + wait
-        with open(pidfile, 'r') as f:
-            server_pid = int(f.read())
-        while now <= timeout:
-            if not passenger_root or timeout - now < wait / 2:
-                # Half timeout has elapsed. Start sending SIGTERM
-                os.kill(server_pid, signal.SIGTERM)
-            # Raise OSError if process has disappeared
-            os.getpgid(server_pid)
+
+    now = time.time()
+    startTERM = now
+    deadline = now + wait
+
+    if passenger_root:
+        # First try to shut down nicely
+        restore_cwd = os.getcwd()
+        os.chdir(passenger_root)
+        subprocess.call([
+            'bundle', 'exec', 'passenger', 'stop', '--pid-file', pidfile])
+        os.chdir(restore_cwd)
+        # Use up to half of the +wait+ period waiting for "passenger
+        # stop" to work. If the process hasn't exited by then, start
+        # sending TERM signals.
+        startTERM += wait/2
+
+    server_pid = None
+    while now <= deadline and server_pid is None:
+        try:
+            with open(pidfile, 'r') as f:
+                server_pid = int(f.read())
+        except IOError:
+            # No pidfile = nothing to kill.
+            return
+        except ValueError as error:
+            # Pidfile exists, but we can't parse it. Perhaps the
+            # server has created the file but hasn't written its PID
+            # yet?
+            print("Parse error reading pidfile {}: {}".format(pidfile, error),
+                  file=sys.stderr)
             time.sleep(0.1)
             now = time.time()
             time.sleep(0.1)
             now = time.time()
-    except IOError:
-        pass
-    except OSError:
-        pass
+
+    while now <= deadline:
+        try:
+            exited, _ = os.waitpid(server_pid, os.WNOHANG)
+            if exited > 0:
+                return
+        except OSError:
+            # already exited, or isn't our child process
+            pass
+        try:
+            if now >= startTERM:
+                os.kill(server_pid, signal.SIGTERM)
+                print("Sent SIGTERM to {} ({})".format(server_pid, pidfile),
+                      file=sys.stderr)
+        except OSError as error:
+            if error.errno == errno.ESRCH:
+                # Thrown by os.getpgid() or os.kill() if the process
+                # does not exist, i.e., our work here is done.
+                return
+            raise
+        time.sleep(0.1)
+        now = time.time()
+
+    print("Server PID {} ({}) did not exit, giving up after {}s".
+          format(server_pid, pidfile, wait),
+          file=sys.stderr)
 
 def find_available_port():
 
 def find_available_port():
-    """Return a port number that is not in use right now.
+    """Return an IPv4 port number that is not in use right now.
+
+    We assume whoever needs to use the returned port is able to reuse
+    a recently used port without waiting for TIME_WAIT (see
+    SO_REUSEADDR / SO_REUSEPORT).
 
     Some opportunity for races here, but it's better than choosing
     something at random and not checking at all. If all of our servers
     (hey Passenger) knew that listening on port 0 was a thing, the OS
     would take care of the races, and this wouldn't be needed at all.
     """
 
     Some opportunity for races here, but it's better than choosing
     something at random and not checking at all. If all of our servers
     (hey Passenger) knew that listening on port 0 was a thing, the OS
     would take care of the races, and this wouldn't be needed at all.
     """
-    port = None
-    while port is None:
-        port = random.randint(20000, 40000)
-        port_hex = ':%04x ' % port
-        try:
-            with open('/proc/net/tcp', 'r') as f:
-                for line in f:
-                    if 0 <= string.find(line, port_hex):
-                        port = None
-                        break
-        except OSError:
-            # This isn't going so well. Just use the random port.
-            pass
-        except IOError:
-            pass
+
+    sock = socket.socket()
+    sock.bind(('0.0.0.0', 0))
+    port = sock.getsockname()[1]
+    sock.close()
     return port
 
     return port
 
+def _wait_until_port_listens(port, timeout=10):
+    """Wait for a process to start listening on the given port.
+
+    If nothing listens on the port within the specified timeout (given
+    in seconds), print a warning on stderr before returning.
+    """
+    try:
+        subprocess.check_output(['which', 'lsof'])
+    except subprocess.CalledProcessError:
+        print("WARNING: No `lsof` -- cannot wait for port to listen. "+
+              "Sleeping 0.5 and hoping for the best.",
+              file=sys.stderr)
+        time.sleep(0.5)
+        return
+    deadline = time.time() + timeout
+    while time.time() < deadline:
+        try:
+            subprocess.check_output(
+                ['lsof', '-t', '-i', 'tcp:'+str(port)])
+        except subprocess.CalledProcessError:
+            time.sleep(0.1)
+            continue
+        return
+    print(
+        "WARNING: Nothing is listening on port {} (waited {} seconds).".
+        format(port, timeout),
+        file=sys.stderr)
+
+def _fifo2stderr(label):
+    """Create a fifo, and copy it to stderr, prepending label to each line.
+
+    Return value is the path to the new FIFO.
+
+    +label+ should contain only alphanumerics: it is also used as part
+    of the FIFO filename.
+    """
+    fifo = os.path.join(TEST_TMPDIR, label+'.fifo')
+    try:
+        os.remove(fifo)
+    except OSError as error:
+        if error.errno != errno.ENOENT:
+            raise
+    os.mkfifo(fifo, 0700)
+    subprocess.Popen(
+        ['stdbuf', '-i0', '-oL', '-eL', 'sed', '-e', 's/^/['+label+'] /', fifo],
+        stdout=sys.stderr)
+    return fifo
+
 def run(leave_running_atexit=False):
     """Ensure an API server is running, and ARVADOS_API_* env vars have
     admin credentials for it.
 def run(leave_running_atexit=False):
     """Ensure an API server is running, and ARVADOS_API_* env vars have
     admin credentials for it.
@@ -135,27 +215,36 @@ def run(leave_running_atexit=False):
     """
     global my_api_host
 
     """
     global my_api_host
 
-    # Delete cached discovery document.
-    shutil.rmtree(arvados.http_cache('discovery'))
+    # Delete cached discovery documents.
+    #
+    # This will clear cached docs that belong to other processes (like
+    # concurrent test suites) even if they're still running. They should
+    # be able to tolerate that.
+    for fn in glob.glob(os.path.join(arvados.http_cache('discovery'),
+                                     '*,arvados,v1,rest,*')):
+        os.unlink(fn)
 
 
-    pid_file = os.path.join(SERVICES_SRC_DIR, 'api', SERVER_PID_PATH)
+    pid_file = _pidfile('api')
     pid_file_ok = find_server_pid(pid_file, 0)
 
     existing_api_host = os.environ.get('ARVADOS_TEST_API_HOST', my_api_host)
     if existing_api_host and pid_file_ok:
     pid_file_ok = find_server_pid(pid_file, 0)
 
     existing_api_host = os.environ.get('ARVADOS_TEST_API_HOST', my_api_host)
     if existing_api_host and pid_file_ok:
-        try:
-            reset()
-            return
-        except:
-            pass
+        if existing_api_host == my_api_host:
+            try:
+                return reset()
+            except:
+                # Fall through to shutdown-and-start case.
+                pass
+        else:
+            # Server was provided by parent. Can't recover if it's
+            # unresettable.
+            return reset()
 
     # Before trying to start up our own server, call stop() to avoid
     # "Phusion Passenger Standalone is already running on PID 12345".
 
     # Before trying to start up our own server, call stop() to avoid
     # "Phusion Passenger Standalone is already running on PID 12345".
-    # We want to kill it if it's our own _or_ it's some stale
-    # left-over server. But if it's been deliberately provided to us
-    # by a parent process, we don't want to force-kill it. That'll
-    # just wreck things for the next test suite that tries to use it.
-    stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
+    # (If we've gotten this far, ARVADOS_TEST_API_HOST isn't set, so
+    # we know the server is ours to kill.)
+    stop(force=True)
 
     restore_cwd = os.getcwd()
     api_src_dir = os.path.join(SERVICES_SRC_DIR, 'api')
 
     restore_cwd = os.getcwd()
     api_src_dir = os.path.join(SERVICES_SRC_DIR, 'api')
@@ -165,6 +254,15 @@ def run(leave_running_atexit=False):
     # died, or we have lost our credentials, or something else is
     # preventing us from calling reset(). Start a new one.
 
     # died, or we have lost our credentials, or something else is
     # preventing us from calling reset(). Start a new one.
 
+    if not os.path.exists('tmp'):
+        os.makedirs('tmp')
+
+    if not os.path.exists('tmp/api'):
+        os.makedirs('tmp/api')
+
+    if not os.path.exists('tmp/logs'):
+        os.makedirs('tmp/logs')
+
     if not os.path.exists('tmp/self-signed.pem'):
         # We assume here that either passenger reports its listening
         # address as https:/0.0.0.0:port/. If it reports "127.0.0.1"
     if not os.path.exists('tmp/self-signed.pem'):
         # We assume here that either passenger reports its listening
         # address as https:/0.0.0.0:port/. If it reports "127.0.0.1"
@@ -177,12 +275,29 @@ def run(leave_running_atexit=False):
             '-out', 'tmp/self-signed.pem',
             '-keyout', 'tmp/self-signed.key',
             '-days', '3650',
             '-out', 'tmp/self-signed.pem',
             '-keyout', 'tmp/self-signed.key',
             '-days', '3650',
-            '-subj', '/CN=0.0.0.0'])
+            '-subj', '/CN=0.0.0.0'],
+        stdout=sys.stderr)
+
+    # Install the git repository fixtures.
+    gitdir = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'git')
+    gittarball = os.path.join(SERVICES_SRC_DIR, 'api', 'test', 'test.git.tar')
+    if not os.path.isdir(gitdir):
+        os.makedirs(gitdir)
+    subprocess.check_output(['tar', '-xC', gitdir, '-f', gittarball])
+
+    # The nginx proxy isn't listening here yet, but we need to choose
+    # the wss:// port now so we can write the API server config file.
+    wss_port = find_available_port()
+    _setport('wss', wss_port)
 
     port = find_available_port()
     env = os.environ.copy()
     env['RAILS_ENV'] = 'test'
 
     port = find_available_port()
     env = os.environ.copy()
     env['RAILS_ENV'] = 'test'
-    env['ARVADOS_WEBSOCKETS'] = 'yes'
+    env['ARVADOS_TEST_WSS_PORT'] = str(wss_port)
+    if env.get('ARVADOS_TEST_EXPERIMENTAL_WS'):
+        env.pop('ARVADOS_WEBSOCKETS', None)
+    else:
+        env['ARVADOS_WEBSOCKETS'] = 'yes'
     env.pop('ARVADOS_TEST_API_HOST', None)
     env.pop('ARVADOS_API_HOST', None)
     env.pop('ARVADOS_API_HOST_INSECURE', None)
     env.pop('ARVADOS_TEST_API_HOST', None)
     env.pop('ARVADOS_API_HOST', None)
     env.pop('ARVADOS_API_HOST_INSECURE', None)
@@ -190,7 +305,7 @@ def run(leave_running_atexit=False):
     start_msg = subprocess.check_output(
         ['bundle', 'exec',
          'passenger', 'start', '-d', '-p{}'.format(port),
     start_msg = subprocess.check_output(
         ['bundle', 'exec',
          'passenger', 'start', '-d', '-p{}'.format(port),
-         '--pid-file', os.path.join(os.getcwd(), pid_file),
+         '--pid-file', pid_file,
          '--log-file', os.path.join(os.getcwd(), 'log/test.log'),
          '--ssl',
          '--ssl-certificate', 'tmp/self-signed.pem',
          '--log-file', os.path.join(os.getcwd(), 'log/test.log'),
          '--ssl',
          '--ssl-certificate', 'tmp/self-signed.pem',
@@ -207,8 +322,10 @@ def run(leave_running_atexit=False):
     my_api_host = match.group(1)
     os.environ['ARVADOS_API_HOST'] = my_api_host
 
     my_api_host = match.group(1)
     os.environ['ARVADOS_API_HOST'] = my_api_host
 
-    # Make sure the server has written its pid file before continuing
+    # Make sure the server has written its pid file and started
+    # listening on its TCP port
     find_server_pid(pid_file)
     find_server_pid(pid_file)
+    _wait_until_port_listens(port)
 
     reset()
     os.chdir(restore_cwd)
 
     reset()
     os.chdir(restore_cwd)
@@ -250,51 +367,102 @@ def stop(force=False):
     """
     global my_api_host
     if force or my_api_host is not None:
     """
     global my_api_host
     if force or my_api_host is not None:
-        kill_server_pid(os.path.join(SERVICES_SRC_DIR, 'api', SERVER_PID_PATH))
+        kill_server_pid(_pidfile('api'))
         my_api_host = None
 
         my_api_host = None
 
+def run_ws():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    stop_ws()
+    port = find_available_port()
+    conf = os.path.join(TEST_TMPDIR, 'ws.yml')
+    with open(conf, 'w') as f:
+        f.write("""
+Client:
+  APIHost: {}
+  Insecure: true
+Listen: :{}
+LogLevel: {}
+Postgres:
+  host: {}
+  dbname: {}
+  user: {}
+  password: {}
+  sslmode: require
+        """.format(os.environ['ARVADOS_API_HOST'],
+                   port,
+                   ('info' if os.environ.get('ARVADOS_DEBUG', '') in ['','0'] else 'debug'),
+                   _dbconfig('host'),
+                   _dbconfig('database'),
+                   _dbconfig('username'),
+                   _dbconfig('password')))
+    logf = open(_fifo2stderr('ws'), 'w')
+    ws = subprocess.Popen(
+        ["ws", "-config", conf],
+        stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
+    with open(_pidfile('ws'), 'w') as f:
+        f.write(str(ws.pid))
+    _wait_until_port_listens(port)
+    _setport('ws', port)
+    return port
+
+def stop_ws():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    kill_server_pid(_pidfile('ws'))
+
 def _start_keep(n, keep_args):
     keep0 = tempfile.mkdtemp()
     port = find_available_port()
     keep_cmd = ["keepstore",
 def _start_keep(n, keep_args):
     keep0 = tempfile.mkdtemp()
     port = find_available_port()
     keep_cmd = ["keepstore",
-                "-volumes={}".format(keep0),
+                "-volume={}".format(keep0),
                 "-listen=:{}".format(port),
                 "-listen=:{}".format(port),
-                "-pid={}".format("{}/keep{}.pid".format(TEST_TMPDIR, n))]
+                "-pid="+_pidfile('keep{}'.format(n))]
 
     for arg, val in keep_args.iteritems():
         keep_cmd.append("{}={}".format(arg, val))
 
 
     for arg, val in keep_args.iteritems():
         keep_cmd.append("{}={}".format(arg, val))
 
-    kp0 = subprocess.Popen(keep_cmd)
-    with open("{}/keep{}.pid".format(TEST_TMPDIR, n), 'w') as f:
+    logf = open(_fifo2stderr('keep{}'.format(n)), 'w')
+    kp0 = subprocess.Popen(
+        keep_cmd, stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
+
+    with open(_pidfile('keep{}'.format(n)), 'w') as f:
         f.write(str(kp0.pid))
 
     with open("{}/keep{}.volume".format(TEST_TMPDIR, n), 'w') as f:
         f.write(keep0)
 
         f.write(str(kp0.pid))
 
     with open("{}/keep{}.volume".format(TEST_TMPDIR, n), 'w') as f:
         f.write(keep0)
 
+    _wait_until_port_listens(port)
+
     return port
 
     return port
 
-def run_keep(blob_signing_key=None, enforce_permissions=False):
-    stop_keep()
+def run_keep(blob_signing_key=None, enforce_permissions=False, num_servers=2):
+    stop_keep(num_servers)
 
     keep_args = {}
 
     keep_args = {}
-    if blob_signing_key:
-        with open(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"), "w") as f:
-            keep_args['--permission-key-file'] = f.name
-            f.write(blob_signing_key)
-    if enforce_permissions:
-        keep_args['--enforce-permissions'] = 'true'
+    if not blob_signing_key:
+        blob_signing_key = 'zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc'
+    with open(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"), "w") as f:
+        keep_args['-blob-signing-key-file'] = f.name
+        f.write(blob_signing_key)
+    keep_args['-enforce-permissions'] = str(enforce_permissions).lower()
+    with open(os.path.join(TEST_TMPDIR, "keep.data-manager-token-file"), "w") as f:
+        keep_args['-data-manager-token-file'] = f.name
+        f.write(auth_token('data_manager'))
+    keep_args['-never-delete'] = 'false'
 
     api = arvados.api(
 
     api = arvados.api(
-        'v1', cache=False,
+        version='v1',
         host=os.environ['ARVADOS_API_HOST'],
         token=os.environ['ARVADOS_API_TOKEN'],
         insecure=True)
         host=os.environ['ARVADOS_API_HOST'],
         token=os.environ['ARVADOS_API_TOKEN'],
         insecure=True)
-    for d in api.keep_services().list().execute()['items']:
+
+    for d in api.keep_services().list(filters=[['service_type','=','disk']]).execute()['items']:
         api.keep_services().delete(uuid=d['uuid']).execute()
     for d in api.keep_disks().list().execute()['items']:
         api.keep_disks().delete(uuid=d['uuid']).execute()
 
         api.keep_services().delete(uuid=d['uuid']).execute()
     for d in api.keep_disks().list().execute()['items']:
         api.keep_disks().delete(uuid=d['uuid']).execute()
 
-    for d in range(0, 2):
+    for d in range(0, num_servers):
         port = _start_keep(d, keep_args)
         svc = api.keep_services().create(body={'keep_service': {
             'uuid': 'zzzzz-bi6l4-keepdisk{:07d}'.format(d),
         port = _start_keep(d, keep_args)
         svc = api.keep_services().create(body={'keep_service': {
             'uuid': 'zzzzz-bi6l4-keepdisk{:07d}'.format(d),
@@ -307,8 +475,17 @@ def run_keep(blob_signing_key=None, enforce_permissions=False):
             'keep_disk': {'keep_service_uuid': svc['uuid'] }
         }).execute()
 
             'keep_disk': {'keep_service_uuid': svc['uuid'] }
         }).execute()
 
+    # If keepproxy is running, send SIGHUP to make it discover the new
+    # keepstore services.
+    proxypidfile = _pidfile('keepproxy')
+    if os.path.exists(proxypidfile):
+        try:
+            os.kill(int(open(proxypidfile).read()), signal.SIGHUP)
+        except OSError:
+            os.remove(proxypidfile)
+
 def _stop_keep(n):
 def _stop_keep(n):
-    kill_server_pid("{}/keep{}.pid".format(TEST_TMPDIR, n), 0)
+    kill_server_pid(_pidfile('keep{}'.format(n)))
     if os.path.exists("{}/keep{}.volume".format(TEST_TMPDIR, n)):
         with open("{}/keep{}.volume".format(TEST_TMPDIR, n), 'r') as r:
             shutil.rmtree(r.read(), True)
     if os.path.exists("{}/keep{}.volume".format(TEST_TMPDIR, n)):
         with open("{}/keep{}.volume".format(TEST_TMPDIR, n), 'r') as r:
             shutil.rmtree(r.read(), True)
@@ -316,27 +493,29 @@ def _stop_keep(n):
     if os.path.exists(os.path.join(TEST_TMPDIR, "keep.blob_signing_key")):
         os.remove(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"))
 
     if os.path.exists(os.path.join(TEST_TMPDIR, "keep.blob_signing_key")):
         os.remove(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"))
 
-def stop_keep():
-    _stop_keep(0)
-    _stop_keep(1)
+def stop_keep(num_servers=2):
+    for n in range(0, num_servers):
+        _stop_keep(n)
 
 def run_keep_proxy():
 
 def run_keep_proxy():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
     stop_keep_proxy()
 
     stop_keep_proxy()
 
-    admin_token = auth_token('admin')
     port = find_available_port()
     env = os.environ.copy()
     port = find_available_port()
     env = os.environ.copy()
-    env['ARVADOS_API_TOKEN'] = admin_token
+    env['ARVADOS_API_TOKEN'] = auth_token('anonymous')
+    logf = open(_fifo2stderr('keepproxy'), 'w')
     kp = subprocess.Popen(
         ['keepproxy',
     kp = subprocess.Popen(
         ['keepproxy',
-         '-pid={}/keepproxy.pid'.format(TEST_TMPDIR),
+         '-pid='+_pidfile('keepproxy'),
          '-listen=:{}'.format(port)],
          '-listen=:{}'.format(port)],
-        env=env)
+        env=env, stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
 
     api = arvados.api(
 
     api = arvados.api(
-        'v1', cache=False,
+        version='v1',
         host=os.environ['ARVADOS_API_HOST'],
         host=os.environ['ARVADOS_API_HOST'],
-        token=admin_token,
+        token=auth_token('admin'),
         insecure=True)
     for d in api.keep_services().list(
             filters=[['service_type','=','proxy']]).execute()['items']:
         insecure=True)
     for d in api.keep_services().list(
             filters=[['service_type','=','proxy']]).execute()['items']:
@@ -347,10 +526,150 @@ def run_keep_proxy():
         'service_type': 'proxy',
         'service_ssl_flag': False,
     }}).execute()
         'service_type': 'proxy',
         'service_ssl_flag': False,
     }}).execute()
-    os.environ["ARVADOS_KEEP_PROXY"] = "http://localhost:{}".format(port)
+    os.environ["ARVADOS_KEEP_SERVICES"] = "http://localhost:{}".format(port)
+    _setport('keepproxy', port)
+    _wait_until_port_listens(port)
 
 def stop_keep_proxy():
 
 def stop_keep_proxy():
-    kill_server_pid(os.path.join(TEST_TMPDIR, "keepproxy.pid"), 0)
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    kill_server_pid(_pidfile('keepproxy'))
+
+def run_arv_git_httpd():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    stop_arv_git_httpd()
+
+    gitdir = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'git')
+    gitport = find_available_port()
+    env = os.environ.copy()
+    env.pop('ARVADOS_API_TOKEN', None)
+    logf = open(_fifo2stderr('arv-git-httpd'), 'w')
+    agh = subprocess.Popen(
+        ['arv-git-httpd',
+         '-repo-root='+gitdir+'/test',
+         '-address=:'+str(gitport)],
+        env=env, stdin=open('/dev/null'), stdout=logf, stderr=logf)
+    with open(_pidfile('arv-git-httpd'), 'w') as f:
+        f.write(str(agh.pid))
+    _setport('arv-git-httpd', gitport)
+    _wait_until_port_listens(gitport)
+
+def stop_arv_git_httpd():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    kill_server_pid(_pidfile('arv-git-httpd'))
+
+def run_keep_web():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    stop_keep_web()
+
+    keepwebport = find_available_port()
+    env = os.environ.copy()
+    env['ARVADOS_API_TOKEN'] = auth_token('anonymous')
+    logf = open(_fifo2stderr('keep-web'), 'w')
+    keepweb = subprocess.Popen(
+        ['keep-web',
+         '-allow-anonymous',
+         '-attachment-only-host=download:'+str(keepwebport),
+         '-listen=:'+str(keepwebport)],
+        env=env, stdin=open('/dev/null'), stdout=logf, stderr=logf)
+    with open(_pidfile('keep-web'), 'w') as f:
+        f.write(str(keepweb.pid))
+    _setport('keep-web', keepwebport)
+    _wait_until_port_listens(keepwebport)
+
+def stop_keep_web():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    kill_server_pid(_pidfile('keep-web'))
+
+def run_nginx():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    stop_nginx()
+    nginxconf = {}
+    nginxconf['KEEPWEBPORT'] = _getport('keep-web')
+    nginxconf['KEEPWEBDLSSLPORT'] = find_available_port()
+    nginxconf['KEEPWEBSSLPORT'] = find_available_port()
+    nginxconf['KEEPPROXYPORT'] = _getport('keepproxy')
+    nginxconf['KEEPPROXYSSLPORT'] = find_available_port()
+    nginxconf['GITPORT'] = _getport('arv-git-httpd')
+    nginxconf['GITSSLPORT'] = find_available_port()
+    nginxconf['WSPORT'] = _getport('ws')
+    nginxconf['WSSPORT'] = _getport('wss')
+    nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem')
+    nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key')
+    nginxconf['ACCESSLOG'] = _fifo2stderr('nginx_access_log')
+
+    conftemplatefile = os.path.join(MY_DIRNAME, 'nginx.conf')
+    conffile = os.path.join(TEST_TMPDIR, 'nginx.conf')
+    with open(conffile, 'w') as f:
+        f.write(re.sub(
+            r'{{([A-Z]+)}}',
+            lambda match: str(nginxconf.get(match.group(1))),
+            open(conftemplatefile).read()))
+
+    env = os.environ.copy()
+    env['PATH'] = env['PATH']+':/sbin:/usr/sbin:/usr/local/sbin'
+
+    nginx = subprocess.Popen(
+        ['nginx',
+         '-g', 'error_log stderr info;',
+         '-g', 'pid '+_pidfile('nginx')+';',
+         '-c', conffile],
+        env=env, stdin=open('/dev/null'), stdout=sys.stderr)
+    _setport('keep-web-dl-ssl', nginxconf['KEEPWEBDLSSLPORT'])
+    _setport('keep-web-ssl', nginxconf['KEEPWEBSSLPORT'])
+    _setport('keepproxy-ssl', nginxconf['KEEPPROXYSSLPORT'])
+    _setport('arv-git-httpd-ssl', nginxconf['GITSSLPORT'])
+
+def stop_nginx():
+    if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
+        return
+    kill_server_pid(_pidfile('nginx'))
+
+def _pidfile(program):
+    return os.path.join(TEST_TMPDIR, program + '.pid')
+
+def _portfile(program):
+    return os.path.join(TEST_TMPDIR, program + '.port')
+
+def _setport(program, port):
+    with open(_portfile(program), 'w') as f:
+        f.write(str(port))
+
+# Returns 9 if program is not up.
+def _getport(program):
+    try:
+        return int(open(_portfile(program)).read())
+    except IOError:
+        return 9
+
+def _dbconfig(key):
+    global _cached_db_config
+    if not _cached_db_config:
+        _cached_db_config = yaml.load(open(os.path.join(
+            SERVICES_SRC_DIR, 'api', 'config', 'database.yml')))
+    return _cached_db_config['test'][key]
+
+def _apiconfig(key):
+    global _cached_config
+    if _cached_config:
+        return _cached_config[key]
+    def _load(f, required=True):
+        fullpath = os.path.join(SERVICES_SRC_DIR, 'api', 'config', f)
+        if not required and not os.path.exists(fullpath):
+            return {}
+        return yaml.load(fullpath)
+    cdefault = _load('application.default.yml')
+    csite = _load('application.yml', required=False)
+    _cached_config = {}
+    for section in [cdefault.get('common',{}), cdefault.get('test',{}),
+                    csite.get('common',{}), csite.get('test',{})]:
+        _cached_config.update(section)
+    return _cached_config[key]
 
 def fixture(fix):
     '''load a fixture yaml file'''
 
 def fixture(fix):
     '''load a fixture yaml file'''
@@ -390,8 +709,10 @@ class TestCaseWithServers(unittest.TestCase):
     original environment.
     """
     MAIN_SERVER = None
     original environment.
     """
     MAIN_SERVER = None
+    WS_SERVER = None
     KEEP_SERVER = None
     KEEP_PROXY_SERVER = None
     KEEP_SERVER = None
     KEEP_PROXY_SERVER = None
+    KEEP_WEB_SERVER = None
 
     @staticmethod
     def _restore_dict(src, dest):
 
     @staticmethod
     def _restore_dict(src, dest):
@@ -405,12 +726,14 @@ class TestCaseWithServers(unittest.TestCase):
         cls._orig_environ = os.environ.copy()
         cls._orig_config = arvados.config.settings().copy()
         cls._cleanup_funcs = []
         cls._orig_environ = os.environ.copy()
         cls._orig_config = arvados.config.settings().copy()
         cls._cleanup_funcs = []
-        os.environ.pop('ARVADOS_KEEP_PROXY', None)
+        os.environ.pop('ARVADOS_KEEP_SERVICES', None)
         os.environ.pop('ARVADOS_EXTERNAL_CLIENT', None)
         for server_kwargs, start_func, stop_func in (
                 (cls.MAIN_SERVER, run, reset),
         os.environ.pop('ARVADOS_EXTERNAL_CLIENT', None)
         for server_kwargs, start_func, stop_func in (
                 (cls.MAIN_SERVER, run, reset),
+                (cls.WS_SERVER, run_ws, stop_ws),
                 (cls.KEEP_SERVER, run_keep, stop_keep),
                 (cls.KEEP_SERVER, run_keep, stop_keep),
-                (cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy)):
+                (cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy),
+                (cls.KEEP_WEB_SERVER, run_keep_web, stop_keep_web)):
             if server_kwargs is not None:
                 start_func(**server_kwargs)
                 cls._cleanup_funcs.append(stop_func)
             if server_kwargs is not None:
                 start_func(**server_kwargs)
                 cls._cleanup_funcs.append(stop_func)
@@ -432,14 +755,28 @@ class TestCaseWithServers(unittest.TestCase):
 
 
 if __name__ == "__main__":
 
 
 if __name__ == "__main__":
-    actions = ['start', 'stop',
-               'start_keep', 'stop_keep',
-               'start_keep_proxy', 'stop_keep_proxy']
+    actions = [
+        'start', 'stop',
+        'start_ws', 'stop_ws',
+        'start_keep', 'stop_keep',
+        'start_keep_proxy', 'stop_keep_proxy',
+        'start_keep-web', 'stop_keep-web',
+        'start_arv-git-httpd', 'stop_arv-git-httpd',
+        'start_nginx', 'stop_nginx',
+    ]
     parser = argparse.ArgumentParser()
     parser.add_argument('action', type=str, help="one of {}".format(actions))
     parser.add_argument('--auth', type=str, metavar='FIXTURE_NAME', help='Print authorization info for given api_client_authorizations fixture')
     parser = argparse.ArgumentParser()
     parser.add_argument('action', type=str, help="one of {}".format(actions))
     parser.add_argument('--auth', type=str, metavar='FIXTURE_NAME', help='Print authorization info for given api_client_authorizations fixture')
+    parser.add_argument('--num-keep-servers', metavar='int', type=int, default=2, help="Number of keep servers desired")
+    parser.add_argument('--keep-enforce-permissions', action="store_true", help="Enforce keep permissions")
+
     args = parser.parse_args()
 
     args = parser.parse_args()
 
+    if args.action not in actions:
+        print("Unrecognized action '{}'. Actions are: {}.".
+              format(args.action, actions),
+              file=sys.stderr)
+        sys.exit(1)
     if args.action == 'start':
         stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
         run(leave_running_atexit=True)
     if args.action == 'start':
         stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
         run(leave_running_atexit=True)
@@ -453,13 +790,29 @@ if __name__ == "__main__":
             print(host)
     elif args.action == 'stop':
         stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
             print(host)
     elif args.action == 'stop':
         stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
+    elif args.action == 'start_ws':
+        run_ws()
+    elif args.action == 'stop_ws':
+        stop_ws()
     elif args.action == 'start_keep':
     elif args.action == 'start_keep':
-        run_keep()
+        run_keep(enforce_permissions=args.keep_enforce_permissions, num_servers=args.num_keep_servers)
     elif args.action == 'stop_keep':
     elif args.action == 'stop_keep':
-        stop_keep()
+        stop_keep(num_servers=args.num_keep_servers)
     elif args.action == 'start_keep_proxy':
         run_keep_proxy()
     elif args.action == 'stop_keep_proxy':
         stop_keep_proxy()
     elif args.action == 'start_keep_proxy':
         run_keep_proxy()
     elif args.action == 'stop_keep_proxy':
         stop_keep_proxy()
+    elif args.action == 'start_arv-git-httpd':
+        run_arv_git_httpd()
+    elif args.action == 'stop_arv-git-httpd':
+        stop_arv_git_httpd()
+    elif args.action == 'start_keep-web':
+        run_keep_web()
+    elif args.action == 'stop_keep-web':
+        stop_keep_web()
+    elif args.action == 'start_nginx':
+        run_nginx()
+    elif args.action == 'stop_nginx':
+        stop_nginx()
     else:
     else:
-        print("Unrecognized action '{}'. Actions are: {}.".format(args.action, actions))
+        raise Exception("action recognized but not implemented!?")