4823: Refactoring. ReadOnly Collection is now CollectionReader, replacing old
[arvados.git] / sdk / python / tests / run_test_server.py
1 #!/usr/bin/env python
2
3 import argparse
4 import atexit
5 import httplib2
6 import os
7 import pipes
8 import random
9 import re
10 import shutil
11 import signal
12 import socket
13 import subprocess
14 import string
15 import sys
16 import tempfile
17 import time
18 import unittest
19 import yaml
20
21 MY_DIRNAME = os.path.dirname(os.path.realpath(__file__))
22 if __name__ == '__main__' and os.path.exists(
23       os.path.join(MY_DIRNAME, '..', 'arvados', '__init__.py')):
24     # We're being launched to support another test suite.
25     # Add the Python SDK source to the library path.
26     sys.path.insert(1, os.path.dirname(MY_DIRNAME))
27
28 import arvados.api
29 import arvados.config
30
31 ARVADOS_DIR = os.path.realpath(os.path.join(MY_DIRNAME, '../../..'))
32 SERVICES_SRC_DIR = os.path.join(ARVADOS_DIR, 'services')
33 SERVER_PID_PATH = 'tmp/pids/test-server.pid'
34 if 'GOPATH' in os.environ:
35     gopaths = os.environ['GOPATH'].split(':')
36     gobins = [os.path.join(path, 'bin') for path in gopaths]
37     os.environ['PATH'] = ':'.join(gobins) + ':' + os.environ['PATH']
38
39 TEST_TMPDIR = os.path.join(ARVADOS_DIR, 'tmp')
40 if not os.path.exists(TEST_TMPDIR):
41     os.mkdir(TEST_TMPDIR)
42
43 my_api_host = None
44
45 def find_server_pid(PID_PATH, wait=10):
46     now = time.time()
47     timeout = now + wait
48     good_pid = False
49     while (not good_pid) and (now <= timeout):
50         time.sleep(0.2)
51         try:
52             with open(PID_PATH, 'r') as f:
53                 server_pid = int(f.read())
54             good_pid = (os.kill(server_pid, 0) is None)
55         except IOError:
56             good_pid = False
57         except OSError:
58             good_pid = False
59         now = time.time()
60
61     if not good_pid:
62         return None
63
64     return server_pid
65
66 def kill_server_pid(pidfile, wait=10, passenger_root=False):
67     # Must re-import modules in order to work during atexit
68     import os
69     import signal
70     import subprocess
71     import time
72     try:
73         if passenger_root:
74             # First try to shut down nicely
75             restore_cwd = os.getcwd()
76             os.chdir(passenger_root)
77             subprocess.call([
78                 'bundle', 'exec', 'passenger', 'stop', '--pid-file', pidfile])
79             os.chdir(restore_cwd)
80         now = time.time()
81         timeout = now + wait
82         with open(pidfile, 'r') as f:
83             server_pid = int(f.read())
84         while now <= timeout:
85             if not passenger_root or timeout - now < wait / 2:
86                 # Half timeout has elapsed. Start sending SIGTERM
87                 os.kill(server_pid, signal.SIGTERM)
88             # Raise OSError if process has disappeared
89             os.getpgid(server_pid)
90             time.sleep(0.1)
91             now = time.time()
92     except IOError:
93         pass
94     except OSError:
95         pass
96
97 def find_available_port():
98     """Return an IPv4 port number that is not in use right now.
99
100     We assume whoever needs to use the returned port is able to reuse
101     a recently used port without waiting for TIME_WAIT (see
102     SO_REUSEADDR / SO_REUSEPORT).
103
104     Some opportunity for races here, but it's better than choosing
105     something at random and not checking at all. If all of our servers
106     (hey Passenger) knew that listening on port 0 was a thing, the OS
107     would take care of the races, and this wouldn't be needed at all.
108     """
109
110     sock = socket.socket()
111     sock.bind(('0.0.0.0', 0))
112     port = sock.getsockname()[1]
113     sock.close()
114     return port
115
116 def run(leave_running_atexit=False):
117     """Ensure an API server is running, and ARVADOS_API_* env vars have
118     admin credentials for it.
119
120     If ARVADOS_TEST_API_HOST is set, a parent process has started a
121     test server for us to use: we just need to reset() it using the
122     admin token fixture.
123
124     If a previous call to run() started a new server process, and it
125     is still running, we just need to reset() it to fixture state and
126     return.
127
128     If neither of those options work out, we'll really start a new
129     server.
130     """
131     global my_api_host
132
133     # Delete cached discovery document.
134     shutil.rmtree(arvados.http_cache('discovery'))
135
136     pid_file = os.path.join(SERVICES_SRC_DIR, 'api', SERVER_PID_PATH)
137     pid_file_ok = find_server_pid(pid_file, 0)
138
139     existing_api_host = os.environ.get('ARVADOS_TEST_API_HOST', my_api_host)
140     if existing_api_host and pid_file_ok:
141         if existing_api_host == my_api_host:
142             try:
143                 return reset()
144             except:
145                 # Fall through to shutdown-and-start case.
146                 pass
147         else:
148             # Server was provided by parent. Can't recover if it's
149             # unresettable.
150             return reset()
151
152     # Before trying to start up our own server, call stop() to avoid
153     # "Phusion Passenger Standalone is already running on PID 12345".
154     # (If we've gotten this far, ARVADOS_TEST_API_HOST isn't set, so
155     # we know the server is ours to kill.)
156     stop(force=True)
157
158     restore_cwd = os.getcwd()
159     api_src_dir = os.path.join(SERVICES_SRC_DIR, 'api')
160     os.chdir(api_src_dir)
161
162     # Either we haven't started a server of our own yet, or it has
163     # died, or we have lost our credentials, or something else is
164     # preventing us from calling reset(). Start a new one.
165
166     if not os.path.exists('tmp/self-signed.pem'):
167         # We assume here that either passenger reports its listening
168         # address as https:/0.0.0.0:port/. If it reports "127.0.0.1"
169         # then the certificate won't match the host and reset() will
170         # fail certificate verification. If it reports "localhost",
171         # clients (notably Python SDK's websocket client) might
172         # resolve localhost as ::1 and then fail to connect.
173         subprocess.check_call([
174             'openssl', 'req', '-new', '-x509', '-nodes',
175             '-out', 'tmp/self-signed.pem',
176             '-keyout', 'tmp/self-signed.key',
177             '-days', '3650',
178             '-subj', '/CN=0.0.0.0'],
179         stdout=sys.stderr)
180
181     port = find_available_port()
182     env = os.environ.copy()
183     env['RAILS_ENV'] = 'test'
184     env['ARVADOS_WEBSOCKETS'] = 'yes'
185     env.pop('ARVADOS_TEST_API_HOST', None)
186     env.pop('ARVADOS_API_HOST', None)
187     env.pop('ARVADOS_API_HOST_INSECURE', None)
188     env.pop('ARVADOS_API_TOKEN', None)
189     start_msg = subprocess.check_output(
190         ['bundle', 'exec',
191          'passenger', 'start', '-d', '-p{}'.format(port),
192          '--pid-file', os.path.join(os.getcwd(), pid_file),
193          '--log-file', os.path.join(os.getcwd(), 'log/test.log'),
194          '--ssl',
195          '--ssl-certificate', 'tmp/self-signed.pem',
196          '--ssl-certificate-key', 'tmp/self-signed.key'],
197         env=env)
198
199     if not leave_running_atexit:
200         atexit.register(kill_server_pid, pid_file, passenger_root=api_src_dir)
201
202     match = re.search(r'Accessible via: https://(.*?)/', start_msg)
203     if not match:
204         raise Exception(
205             "Passenger did not report endpoint: {}".format(start_msg))
206     my_api_host = match.group(1)
207     os.environ['ARVADOS_API_HOST'] = my_api_host
208
209     # Make sure the server has written its pid file before continuing
210     find_server_pid(pid_file)
211
212     reset()
213     os.chdir(restore_cwd)
214
215 def reset():
216     """Reset the test server to fixture state.
217
218     This resets the ARVADOS_TEST_API_HOST provided by a parent process
219     if any, otherwise the server started by run().
220
221     It also resets ARVADOS_* environment vars to point to the test
222     server with admin credentials.
223     """
224     existing_api_host = os.environ.get('ARVADOS_TEST_API_HOST', my_api_host)
225     token = auth_token('admin')
226     httpclient = httplib2.Http(ca_certs=os.path.join(
227         SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem'))
228     httpclient.request(
229         'https://{}/database/reset'.format(existing_api_host),
230         'POST',
231         headers={'Authorization': 'OAuth2 {}'.format(token)})
232     os.environ['ARVADOS_API_HOST_INSECURE'] = 'true'
233     os.environ['ARVADOS_API_HOST'] = existing_api_host
234     os.environ['ARVADOS_API_TOKEN'] = token
235
236 def stop(force=False):
237     """Stop the API server, if one is running.
238
239     If force==False, kill it only if we started it ourselves. (This
240     supports the use case where a Python test suite calls run(), but
241     run() just uses the ARVADOS_TEST_API_HOST provided by the parent
242     process, and the test suite cleans up after itself by calling
243     stop(). In this case the test server provided by the parent
244     process should be left alone.)
245
246     If force==True, kill it even if we didn't start it
247     ourselves. (This supports the use case in __main__, where "run"
248     and "stop" happen in different processes.)
249     """
250     global my_api_host
251     if force or my_api_host is not None:
252         kill_server_pid(os.path.join(SERVICES_SRC_DIR, 'api', SERVER_PID_PATH))
253         my_api_host = None
254
255 def _start_keep(n, keep_args):
256     keep0 = tempfile.mkdtemp()
257     port = find_available_port()
258     keep_cmd = ["keepstore",
259                 "-volumes={}".format(keep0),
260                 "-listen=:{}".format(port),
261                 "-pid={}".format("{}/keep{}.pid".format(TEST_TMPDIR, n))]
262
263     for arg, val in keep_args.iteritems():
264         keep_cmd.append("{}={}".format(arg, val))
265
266     kp0 = subprocess.Popen(keep_cmd)
267     with open("{}/keep{}.pid".format(TEST_TMPDIR, n), 'w') as f:
268         f.write(str(kp0.pid))
269
270     with open("{}/keep{}.volume".format(TEST_TMPDIR, n), 'w') as f:
271         f.write(keep0)
272
273     return port
274
275 def run_keep(blob_signing_key=None, enforce_permissions=False):
276     stop_keep()
277
278     keep_args = {}
279     if blob_signing_key:
280         with open(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"), "w") as f:
281             keep_args['--permission-key-file'] = f.name
282             f.write(blob_signing_key)
283     if enforce_permissions:
284         keep_args['--enforce-permissions'] = 'true'
285
286     api = arvados.api(
287         version='v1',
288         host=os.environ['ARVADOS_API_HOST'],
289         token=os.environ['ARVADOS_API_TOKEN'],
290         insecure=True)
291     for d in api.keep_services().list().execute()['items']:
292         api.keep_services().delete(uuid=d['uuid']).execute()
293     for d in api.keep_disks().list().execute()['items']:
294         api.keep_disks().delete(uuid=d['uuid']).execute()
295
296     for d in range(0, 2):
297         port = _start_keep(d, keep_args)
298         svc = api.keep_services().create(body={'keep_service': {
299             'uuid': 'zzzzz-bi6l4-keepdisk{:07d}'.format(d),
300             'service_host': 'localhost',
301             'service_port': port,
302             'service_type': 'disk',
303             'service_ssl_flag': False,
304         }}).execute()
305         api.keep_disks().create(body={
306             'keep_disk': {'keep_service_uuid': svc['uuid'] }
307         }).execute()
308
309 def _stop_keep(n):
310     kill_server_pid("{}/keep{}.pid".format(TEST_TMPDIR, n), 0)
311     if os.path.exists("{}/keep{}.volume".format(TEST_TMPDIR, n)):
312         with open("{}/keep{}.volume".format(TEST_TMPDIR, n), 'r') as r:
313             shutil.rmtree(r.read(), True)
314         os.unlink("{}/keep{}.volume".format(TEST_TMPDIR, n))
315     if os.path.exists(os.path.join(TEST_TMPDIR, "keep.blob_signing_key")):
316         os.remove(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"))
317
318 def stop_keep():
319     _stop_keep(0)
320     _stop_keep(1)
321
322 def run_keep_proxy():
323     stop_keep_proxy()
324
325     admin_token = auth_token('admin')
326     port = find_available_port()
327     env = os.environ.copy()
328     env['ARVADOS_API_TOKEN'] = admin_token
329     kp = subprocess.Popen(
330         ['keepproxy',
331          '-pid={}/keepproxy.pid'.format(TEST_TMPDIR),
332          '-listen=:{}'.format(port)],
333         env=env)
334
335     api = arvados.api(
336         version='v1',
337         host=os.environ['ARVADOS_API_HOST'],
338         token=admin_token,
339         insecure=True)
340     for d in api.keep_services().list(
341             filters=[['service_type','=','proxy']]).execute()['items']:
342         api.keep_services().delete(uuid=d['uuid']).execute()
343     api.keep_services().create(body={'keep_service': {
344         'service_host': 'localhost',
345         'service_port': port,
346         'service_type': 'proxy',
347         'service_ssl_flag': False,
348     }}).execute()
349     os.environ["ARVADOS_KEEP_PROXY"] = "http://localhost:{}".format(port)
350
351 def stop_keep_proxy():
352     kill_server_pid(os.path.join(TEST_TMPDIR, "keepproxy.pid"), 0)
353
354 def fixture(fix):
355     '''load a fixture yaml file'''
356     with open(os.path.join(SERVICES_SRC_DIR, 'api', "test", "fixtures",
357                            fix + ".yml")) as f:
358         yaml_file = f.read()
359         try:
360           trim_index = yaml_file.index("# Test Helper trims the rest of the file")
361           yaml_file = yaml_file[0:trim_index]
362         except ValueError:
363           pass
364         return yaml.load(yaml_file)
365
366 def auth_token(token_name):
367     return fixture("api_client_authorizations")[token_name]["api_token"]
368
369 def authorize_with(token_name):
370     '''token_name is the symbolic name of the token from the api_client_authorizations fixture'''
371     arvados.config.settings()["ARVADOS_API_TOKEN"] = auth_token(token_name)
372     arvados.config.settings()["ARVADOS_API_HOST"] = os.environ.get("ARVADOS_API_HOST")
373     arvados.config.settings()["ARVADOS_API_HOST_INSECURE"] = "true"
374
375 class TestCaseWithServers(unittest.TestCase):
376     """TestCase to start and stop supporting Arvados servers.
377
378     Define any of MAIN_SERVER, KEEP_SERVER, and/or KEEP_PROXY_SERVER
379     class variables as a dictionary of keyword arguments.  If you do,
380     setUpClass will start the corresponding servers by passing these
381     keyword arguments to the run, run_keep, and/or run_keep_server
382     functions, respectively.  It will also set Arvados environment
383     variables to point to these servers appropriately.  If you don't
384     run a Keep or Keep proxy server, setUpClass will set up a
385     temporary directory for Keep local storage, and set it as
386     KEEP_LOCAL_STORE.
387
388     tearDownClass will stop any servers started, and restore the
389     original environment.
390     """
391     MAIN_SERVER = None
392     KEEP_SERVER = None
393     KEEP_PROXY_SERVER = None
394
395     @staticmethod
396     def _restore_dict(src, dest):
397         for key in dest.keys():
398             if key not in src:
399                 del dest[key]
400         dest.update(src)
401
402     @classmethod
403     def setUpClass(cls):
404         cls._orig_environ = os.environ.copy()
405         cls._orig_config = arvados.config.settings().copy()
406         cls._cleanup_funcs = []
407         os.environ.pop('ARVADOS_KEEP_PROXY', None)
408         os.environ.pop('ARVADOS_EXTERNAL_CLIENT', None)
409         for server_kwargs, start_func, stop_func in (
410                 (cls.MAIN_SERVER, run, reset),
411                 (cls.KEEP_SERVER, run_keep, stop_keep),
412                 (cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy)):
413             if server_kwargs is not None:
414                 start_func(**server_kwargs)
415                 cls._cleanup_funcs.append(stop_func)
416         if (cls.KEEP_SERVER is None) and (cls.KEEP_PROXY_SERVER is None):
417             cls.local_store = tempfile.mkdtemp()
418             os.environ['KEEP_LOCAL_STORE'] = cls.local_store
419             cls._cleanup_funcs.append(
420                 lambda: shutil.rmtree(cls.local_store, ignore_errors=True))
421         else:
422             os.environ.pop('KEEP_LOCAL_STORE', None)
423         arvados.config.initialize()
424
425     @classmethod
426     def tearDownClass(cls):
427         for clean_func in cls._cleanup_funcs:
428             clean_func()
429         cls._restore_dict(cls._orig_environ, os.environ)
430         cls._restore_dict(cls._orig_config, arvados.config.settings())
431
432
433 if __name__ == "__main__":
434     actions = ['start', 'stop',
435                'start_keep', 'stop_keep',
436                'start_keep_proxy', 'stop_keep_proxy']
437     parser = argparse.ArgumentParser()
438     parser.add_argument('action', type=str, help="one of {}".format(actions))
439     parser.add_argument('--auth', type=str, metavar='FIXTURE_NAME', help='Print authorization info for given api_client_authorizations fixture')
440     args = parser.parse_args()
441
442     if args.action == 'start':
443         stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
444         run(leave_running_atexit=True)
445         host = os.environ['ARVADOS_API_HOST']
446         if args.auth is not None:
447             token = auth_token(args.auth)
448             print("export ARVADOS_API_TOKEN={}".format(pipes.quote(token)))
449             print("export ARVADOS_API_HOST={}".format(pipes.quote(host)))
450             print("export ARVADOS_API_HOST_INSECURE=true")
451         else:
452             print(host)
453     elif args.action == 'stop':
454         stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
455     elif args.action == 'start_keep':
456         run_keep()
457     elif args.action == 'stop_keep':
458         stop_keep()
459     elif args.action == 'start_keep_proxy':
460         run_keep_proxy()
461     elif args.action == 'stop_keep_proxy':
462         stop_keep_proxy()
463     else:
464         print("Unrecognized action '{}'. Actions are: {}.".format(args.action, actions))