3021: In start(), if a stale server is already running (but we can't reset() it)...
[arvados.git] / sdk / python / tests / run_test_server.py
1 #!/usr/bin/env python
2
3 import argparse
4 import atexit
5 import httplib2
6 import os
7 import pipes
8 import random
9 import re
10 import shutil
11 import signal
12 import subprocess
13 import string
14 import sys
15 import tempfile
16 import time
17 import unittest
18 import yaml
19
20 MY_DIRNAME = os.path.dirname(os.path.realpath(__file__))
21 if __name__ == '__main__' and os.path.exists(
22       os.path.join(MY_DIRNAME, '..', 'arvados', '__init__.py')):
23     # We're being launched to support another test suite.
24     # Add the Python SDK source to the library path.
25     sys.path.insert(1, os.path.dirname(MY_DIRNAME))
26
27 import arvados.api
28 import arvados.config
29
30 ARVADOS_DIR = os.path.realpath(os.path.join(MY_DIRNAME, '../../..'))
31 SERVICES_SRC_DIR = os.path.join(ARVADOS_DIR, 'services')
32 SERVER_PID_PATH = 'tmp/pids/test-server.pid'
33 if 'GOPATH' in os.environ:
34     gopaths = os.environ['GOPATH'].split(':')
35     gobins = [os.path.join(path, 'bin') for path in gopaths]
36     os.environ['PATH'] = ':'.join(gobins) + ':' + os.environ['PATH']
37
38 TEST_TMPDIR = os.path.join(ARVADOS_DIR, 'tmp')
39 if not os.path.exists(TEST_TMPDIR):
40     os.mkdir(TEST_TMPDIR)
41
42 my_api_host = None
43
44 def find_server_pid(PID_PATH, wait=10):
45     now = time.time()
46     timeout = now + wait
47     good_pid = False
48     while (not good_pid) and (now <= timeout):
49         time.sleep(0.2)
50         try:
51             with open(PID_PATH, 'r') as f:
52                 server_pid = int(f.read())
53             good_pid = (os.kill(server_pid, 0) is None)
54         except IOError:
55             good_pid = False
56         except OSError:
57             good_pid = False
58         now = time.time()
59
60     if not good_pid:
61         return None
62
63     return server_pid
64
65 def kill_server_pid(pidfile, wait=10, passenger_root=False):
66     # Must re-import modules in order to work during atexit
67     import os
68     import signal
69     import subprocess
70     import time
71     try:
72         if passenger_root:
73             # First try to shut down nicely
74             restore_cwd = os.getcwd()
75             os.chdir(passenger_root)
76             subprocess.call([
77                 'bundle', 'exec', 'passenger', 'stop', '--pid-file', pidfile])
78             os.chdir(restore_cwd)
79         now = time.time()
80         timeout = now + wait
81         with open(pidfile, 'r') as f:
82             server_pid = int(f.read())
83         while now <= timeout:
84             if not passenger_root or timeout - now < wait / 2:
85                 # Half timeout has elapsed. Start sending SIGTERM
86                 os.kill(server_pid, signal.SIGTERM)
87             # Raise OSError if process has disappeared
88             os.getpgid(server_pid)
89             time.sleep(0.1)
90             now = time.time()
91     except IOError:
92         pass
93     except OSError:
94         pass
95
96 def find_available_port():
97     """Return a port number that is not in use right now.
98
99     Some opportunity for races here, but it's better than choosing
100     something at random and not checking at all. If all of our servers
101     (hey Passenger) knew that listening on port 0 was a thing, the OS
102     would take care of the races, and this wouldn't be needed at all.
103     """
104     port = None
105     while port is None:
106         port = random.randint(20000, 40000)
107         port_hex = ':%04x ' % port
108         try:
109             with open('/proc/net/tcp', 'r') as f:
110                 for line in f:
111                     if 0 <= string.find(line, port_hex):
112                         port = None
113                         break
114         except OSError:
115             # This isn't going so well. Just use the random port.
116             pass
117         except IOError:
118             pass
119     return port
120
121 def run(leave_running_atexit=False):
122     """Ensure an API server is running, and ARVADOS_API_* env vars have
123     admin credentials for it.
124
125     If ARVADOS_TEST_API_HOST is set, a parent process has started a
126     test server for us to use: we just need to reset() it using the
127     admin token fixture.
128
129     If a previous call to run() started a new server process, and it
130     is still running, we just need to reset() it to fixture state and
131     return.
132
133     If neither of those options work out, we'll really start a new
134     server.
135     """
136     global my_api_host
137
138     # Delete cached discovery document.
139     shutil.rmtree(arvados.http_cache('discovery'))
140
141     os.environ['ARVADOS_API_TOKEN'] = auth_token('admin')
142     os.environ['ARVADOS_API_HOST_INSECURE'] = 'true'
143
144     pid_file = os.path.join(SERVICES_SRC_DIR, 'api', SERVER_PID_PATH)
145     pid_file_ok = find_server_pid(pid_file, 0)
146
147     existing_api_host = os.environ.get('ARVADOS_TEST_API_HOST', my_api_host)
148     if existing_api_host and pid_file_ok:
149         try:
150             os.environ['ARVADOS_API_HOST'] = existing_api_host
151             reset()
152             return
153         except:
154             pass
155
156     # Before trying to start up our own server, call stop() to avoid
157     # "Phusion Passenger Standalone is already running on PID 12345".
158     # We want to kill it if it's our own _or_ it's some stale
159     # left-over server. But if it's been deliberately provided to us
160     # by a parent process, we don't want to force-kill it. That'll
161     # just wreck things for the next test suite that tries to use it.
162     stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
163
164     restore_cwd = os.getcwd()
165     api_src_dir = os.path.join(SERVICES_SRC_DIR, 'api')
166     os.chdir(api_src_dir)
167
168     # Either we haven't started a server of our own yet, or it has
169     # died, or we have lost our credentials, or something else is
170     # preventing us from calling reset(). Start a new one.
171
172     if not os.path.exists('tmp/self-signed.pem'):
173         # We assume here that either passenger reports its listening
174         # address as https:/0.0.0.0:port/. If it reports "127.0.0.1"
175         # then the certificate won't match the host and reset() will
176         # fail certificate verification. If it reports "localhost",
177         # clients (notably Python SDK's websocket client) might
178         # resolve localhost as ::1 and then fail to connect.
179         subprocess.check_call([
180             'openssl', 'req', '-new', '-x509', '-nodes',
181             '-out', 'tmp/self-signed.pem',
182             '-keyout', 'tmp/self-signed.key',
183             '-days', '3650',
184             '-subj', '/CN=0.0.0.0'])
185
186     port = find_available_port()
187     env = os.environ.copy()
188     env['RAILS_ENV'] = 'test'
189     env['ARVADOS_WEBSOCKETS'] = 'yes'
190     env.pop('ARVADOS_TEST_API_HOST', None)
191     env.pop('ARVADOS_API_HOST', None)
192     env.pop('ARVADOS_API_HOST_INSECURE', None)
193     env.pop('ARVADOS_API_TOKEN', None)
194     start_msg = subprocess.check_output(
195         ['bundle', 'exec',
196          'passenger', 'start', '-d', '-p{}'.format(port),
197          '--pid-file', os.path.join(os.getcwd(), pid_file),
198          '--log-file', os.path.join(os.getcwd(), 'log/test.log'),
199          '--ssl',
200          '--ssl-certificate', 'tmp/self-signed.pem',
201          '--ssl-certificate-key', 'tmp/self-signed.key'],
202         env=env)
203
204     if not leave_running_atexit:
205         atexit.register(kill_server_pid, pid_file, passenger_root=api_src_dir)
206
207     match = re.search(r'Accessible via: https://(.*?)/', start_msg)
208     if not match:
209         raise Exception(
210             "Passenger did not report endpoint: {}".format(start_msg))
211     my_api_host = match.group(1)
212     os.environ['ARVADOS_API_HOST'] = my_api_host
213
214     # Make sure the server has written its pid file before continuing
215     find_server_pid(pid_file)
216
217     reset()
218     os.chdir(restore_cwd)
219
220 def reset():
221     """Reset the test server to fixture state.
222
223     This resets the ARVADOS_TEST_API_HOST provided by a parent process
224     if any, otherwise the server started by run().
225     """
226     existing_api_host = os.environ.get('ARVADOS_TEST_API_HOST', my_api_host)
227     token = auth_token('admin')
228     httpclient = httplib2.Http(ca_certs=os.path.join(
229         SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem'))
230     httpclient.request(
231         'https://{}/database/reset'.format(existing_api_host),
232         'POST',
233         headers={'Authorization': 'OAuth2 {}'.format(token)})
234
235 def stop(force=False):
236     """Stop the API server, if one is running.
237
238     If force==False, kill it only if we started it ourselves. (This
239     supports the use case where a Python test suite calls run(), but
240     run() just uses the ARVADOS_TEST_API_HOST provided by the parent
241     process, and the test suite cleans up after itself by calling
242     stop(). In this case the test server provided by the parent
243     process should be left alone.)
244
245     If force==True, kill it even if we didn't start it
246     ourselves. (This supports the use case in __main__, where "run"
247     and "stop" happen in different processes.)
248     """
249     global my_api_host
250     if force or my_api_host is not None:
251         kill_server_pid(os.path.join(SERVICES_SRC_DIR, 'api', SERVER_PID_PATH))
252         my_api_host = None
253
254 def _start_keep(n, keep_args):
255     keep0 = tempfile.mkdtemp()
256     port = find_available_port()
257     keep_cmd = ["keepstore",
258                 "-volumes={}".format(keep0),
259                 "-listen=:{}".format(port),
260                 "-pid={}".format("{}/keep{}.pid".format(TEST_TMPDIR, n))]
261
262     for arg, val in keep_args.iteritems():
263         keep_cmd.append("{}={}".format(arg, val))
264
265     kp0 = subprocess.Popen(keep_cmd)
266     with open("{}/keep{}.pid".format(TEST_TMPDIR, n), 'w') as f:
267         f.write(str(kp0.pid))
268
269     with open("{}/keep{}.volume".format(TEST_TMPDIR, n), 'w') as f:
270         f.write(keep0)
271
272     return port
273
274 def run_keep(blob_signing_key=None, enforce_permissions=False):
275     stop_keep()
276
277     keep_args = {}
278     if blob_signing_key:
279         with open(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"), "w") as f:
280             keep_args['--permission-key-file'] = f.name
281             f.write(blob_signing_key)
282     if enforce_permissions:
283         keep_args['--enforce-permissions'] = 'true'
284
285     api = arvados.api(
286         'v1', cache=False,
287         host=os.environ['ARVADOS_API_HOST'],
288         token=os.environ['ARVADOS_API_TOKEN'],
289         insecure=True)
290     for d in api.keep_services().list().execute()['items']:
291         api.keep_services().delete(uuid=d['uuid']).execute()
292     for d in api.keep_disks().list().execute()['items']:
293         api.keep_disks().delete(uuid=d['uuid']).execute()
294
295     for d in range(0, 2):
296         port = _start_keep(d, keep_args)
297         svc = api.keep_services().create(body={'keep_service': {
298             'uuid': 'zzzzz-bi6l4-keepdisk{:07d}'.format(d),
299             'service_host': 'localhost',
300             'service_port': port,
301             'service_type': 'disk',
302             'service_ssl_flag': False,
303         }}).execute()
304         api.keep_disks().create(body={
305             'keep_disk': {'keep_service_uuid': svc['uuid'] }
306         }).execute()
307
308 def _stop_keep(n):
309     kill_server_pid("{}/keep{}.pid".format(TEST_TMPDIR, n), 0)
310     if os.path.exists("{}/keep{}.volume".format(TEST_TMPDIR, n)):
311         with open("{}/keep{}.volume".format(TEST_TMPDIR, n), 'r') as r:
312             shutil.rmtree(r.read(), True)
313         os.unlink("{}/keep{}.volume".format(TEST_TMPDIR, n))
314     if os.path.exists(os.path.join(TEST_TMPDIR, "keep.blob_signing_key")):
315         os.remove(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"))
316
317 def stop_keep():
318     _stop_keep(0)
319     _stop_keep(1)
320
321 def run_keep_proxy():
322     stop_keep_proxy()
323
324     admin_token = auth_token('admin')
325     port = find_available_port()
326     env = os.environ.copy()
327     env['ARVADOS_API_TOKEN'] = admin_token
328     kp = subprocess.Popen(
329         ['keepproxy',
330          '-pid={}/keepproxy.pid'.format(TEST_TMPDIR),
331          '-listen=:{}'.format(port)],
332         env=env)
333
334     api = arvados.api(
335         'v1', cache=False,
336         host=os.environ['ARVADOS_API_HOST'],
337         token=admin_token,
338         insecure=True)
339     for d in api.keep_services().list(
340             filters=[['service_type','=','proxy']]).execute()['items']:
341         api.keep_services().delete(uuid=d['uuid']).execute()
342     api.keep_services().create(body={'keep_service': {
343         'service_host': 'localhost',
344         'service_port': port,
345         'service_type': 'proxy',
346         'service_ssl_flag': False,
347     }}).execute()
348     os.environ["ARVADOS_KEEP_PROXY"] = "http://localhost:{}".format(port)
349
350 def stop_keep_proxy():
351     kill_server_pid(os.path.join(TEST_TMPDIR, "keepproxy.pid"), 0)
352
353 def fixture(fix):
354     '''load a fixture yaml file'''
355     with open(os.path.join(SERVICES_SRC_DIR, 'api', "test", "fixtures",
356                            fix + ".yml")) as f:
357         yaml_file = f.read()
358         try:
359           trim_index = yaml_file.index("# Test Helper trims the rest of the file")
360           yaml_file = yaml_file[0:trim_index]
361         except ValueError:
362           pass
363         return yaml.load(yaml_file)
364
365 def auth_token(token_name):
366     return fixture("api_client_authorizations")[token_name]["api_token"]
367
368 def authorize_with(token_name):
369     '''token_name is the symbolic name of the token from the api_client_authorizations fixture'''
370     arvados.config.settings()["ARVADOS_API_TOKEN"] = auth_token(token_name)
371     arvados.config.settings()["ARVADOS_API_HOST"] = os.environ.get("ARVADOS_API_HOST")
372     arvados.config.settings()["ARVADOS_API_HOST_INSECURE"] = "true"
373
374 class TestCaseWithServers(unittest.TestCase):
375     """TestCase to start and stop supporting Arvados servers.
376
377     Define any of MAIN_SERVER, KEEP_SERVER, and/or KEEP_PROXY_SERVER
378     class variables as a dictionary of keyword arguments.  If you do,
379     setUpClass will start the corresponding servers by passing these
380     keyword arguments to the run, run_keep, and/or run_keep_server
381     functions, respectively.  It will also set Arvados environment
382     variables to point to these servers appropriately.  If you don't
383     run a Keep or Keep proxy server, setUpClass will set up a
384     temporary directory for Keep local storage, and set it as
385     KEEP_LOCAL_STORE.
386
387     tearDownClass will stop any servers started, and restore the
388     original environment.
389     """
390     MAIN_SERVER = None
391     KEEP_SERVER = None
392     KEEP_PROXY_SERVER = None
393
394     @staticmethod
395     def _restore_dict(src, dest):
396         for key in dest.keys():
397             if key not in src:
398                 del dest[key]
399         dest.update(src)
400
401     @classmethod
402     def setUpClass(cls):
403         cls._orig_environ = os.environ.copy()
404         cls._orig_config = arvados.config.settings().copy()
405         cls._cleanup_funcs = []
406         os.environ.pop('ARVADOS_KEEP_PROXY', None)
407         os.environ.pop('ARVADOS_EXTERNAL_CLIENT', None)
408         for server_kwargs, start_func, stop_func in (
409                 (cls.MAIN_SERVER, run, reset),
410                 (cls.KEEP_SERVER, run_keep, stop_keep),
411                 (cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy)):
412             if server_kwargs is not None:
413                 start_func(**server_kwargs)
414                 cls._cleanup_funcs.append(stop_func)
415         if (cls.KEEP_SERVER is None) and (cls.KEEP_PROXY_SERVER is None):
416             cls.local_store = tempfile.mkdtemp()
417             os.environ['KEEP_LOCAL_STORE'] = cls.local_store
418             cls._cleanup_funcs.append(
419                 lambda: shutil.rmtree(cls.local_store, ignore_errors=True))
420         else:
421             os.environ.pop('KEEP_LOCAL_STORE', None)
422         arvados.config.initialize()
423
424     @classmethod
425     def tearDownClass(cls):
426         for clean_func in cls._cleanup_funcs:
427             clean_func()
428         cls._restore_dict(cls._orig_environ, os.environ)
429         cls._restore_dict(cls._orig_config, arvados.config.settings())
430
431
432 if __name__ == "__main__":
433     actions = ['start', 'stop',
434                'start_keep', 'stop_keep',
435                'start_keep_proxy', 'stop_keep_proxy']
436     parser = argparse.ArgumentParser()
437     parser.add_argument('action', type=str, help="one of {}".format(actions))
438     parser.add_argument('--auth', type=str, metavar='FIXTURE_NAME', help='Print authorization info for given api_client_authorizations fixture')
439     args = parser.parse_args()
440
441     if args.action == 'start':
442         stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
443         run(leave_running_atexit=True)
444         host = os.environ['ARVADOS_API_HOST']
445         if args.auth is not None:
446             token = auth_token(args.auth)
447             print("export ARVADOS_API_TOKEN={}".format(pipes.quote(token)))
448             print("export ARVADOS_API_HOST={}".format(pipes.quote(host)))
449             print("export ARVADOS_API_HOST_INSECURE=true")
450         else:
451             print(host)
452     elif args.action == 'stop':
453         stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
454     elif args.action == 'start_keep':
455         run_keep()
456     elif args.action == 'stop_keep':
457         stop_keep()
458     elif args.action == 'start_keep_proxy':
459         run_keep_proxy()
460     elif args.action == 'stop_keep_proxy':
461         stop_keep_proxy()
462     else:
463         print("Unrecognized action '{}'. Actions are: {}.".format(args.action, actions))