3021: Check whether a randomly selected port is available before using it.
[arvados.git] / sdk / python / tests / run_test_server.py
1 #!/usr/bin/env python
2
3 import argparse
4 import atexit
5 import httplib2
6 import os
7 import pipes
8 import random
9 import re
10 import shutil
11 import signal
12 import subprocess
13 import string
14 import sys
15 import tempfile
16 import time
17 import unittest
18 import yaml
19
20 MY_DIRNAME = os.path.dirname(os.path.realpath(__file__))
21 if __name__ == '__main__' and os.path.exists(
22       os.path.join(MY_DIRNAME, '..', 'arvados', '__init__.py')):
23     # We're being launched to support another test suite.
24     # Add the Python SDK source to the library path.
25     sys.path.insert(1, os.path.dirname(MY_DIRNAME))
26
27 import arvados.api
28 import arvados.config
29
30 ARVADOS_DIR = os.path.realpath(os.path.join(MY_DIRNAME, '../../..'))
31 SERVICES_SRC_DIR = os.path.join(ARVADOS_DIR, 'services')
32 SERVER_PID_PATH = 'tmp/pids/test-server.pid'
33 if 'GOPATH' in os.environ:
34     gopaths = os.environ['GOPATH'].split(':')
35     gobins = [os.path.join(path, 'bin') for path in gopaths]
36     os.environ['PATH'] = ':'.join(gobins) + ':' + os.environ['PATH']
37
38 TEST_TMPDIR = os.path.join(ARVADOS_DIR, 'tmp')
39 if not os.path.exists(TEST_TMPDIR):
40     os.mkdir(TEST_TMPDIR)
41
42 my_api_host = None
43
44 def find_server_pid(PID_PATH, wait=10):
45     now = time.time()
46     timeout = now + wait
47     good_pid = False
48     while (not good_pid) and (now <= timeout):
49         time.sleep(0.2)
50         try:
51             with open(PID_PATH, 'r') as f:
52                 server_pid = int(f.read())
53             good_pid = (os.kill(server_pid, 0) is None)
54         except IOError:
55             good_pid = False
56         except OSError:
57             good_pid = False
58         now = time.time()
59
60     if not good_pid:
61         return None
62
63     return server_pid
64
65 def kill_server_pid(pidfile, wait=10, passenger_root=False):
66     # Must re-import modules in order to work during atexit
67     import os
68     import signal
69     import subprocess
70     import time
71     try:
72         if passenger_root:
73             # First try to shut down nicely
74             restore_cwd = os.getcwd()
75             os.chdir(passenger_root)
76             subprocess.call([
77                 'bundle', 'exec', 'passenger', 'stop', '--pid-file', pidfile])
78             os.chdir(restore_cwd)
79         now = time.time()
80         timeout = now + wait
81         with open(pidfile, 'r') as f:
82             server_pid = int(f.read())
83         while now <= timeout:
84             if not passenger_root or timeout - now < wait / 2:
85                 # Half timeout has elapsed. Start sending SIGTERM
86                 os.kill(server_pid, signal.SIGTERM)
87             # Raise OSError if process has disappeared
88             os.getpgid(server_pid)
89             time.sleep(0.1)
90             now = time.time()
91     except IOError:
92         pass
93     except OSError:
94         pass
95
96 def find_available_port():
97     """Return a port number that is not in use right now.
98
99     Some opportunity for races here, but it's better than choosing
100     something at random and not checking at all. If all of our servers
101     (hey Passenger) knew that listening on port 0 was a thing, the OS
102     would take care of the races, and this wouldn't be needed at all.
103     """
104     port = None
105     while port is None:
106         port = random.randint(20000, 40000)
107         port_hex = ':%04x ' % port
108         try:
109             with open('/proc/net/tcp', 'r') as f:
110                 for line in f:
111                     if 0 <= string.find(line, port_hex):
112                         port = None
113                         break
114         except OSError:
115             # This isn't going so well. Just use the random port.
116             pass
117         except IOError:
118             pass
119     return port
120
121 def run(leave_running_atexit=False):
122     """Ensure an API server is running, and ARVADOS_API_* env vars have
123     admin credentials for it.
124
125     If ARVADOS_TEST_API_HOST is set, a parent process has started a
126     test server for us to use: we just need to reset() it using the
127     admin token fixture.
128
129     If a previous call to run() started a new server process, and it
130     is still running, we just need to reset() it to fixture state and
131     return.
132
133     If neither of those options work out, we'll really start a new
134     server.
135     """
136     global my_api_host
137
138     # Delete cached discovery document.
139     shutil.rmtree(arvados.http_cache('discovery'))
140
141     os.environ['ARVADOS_API_TOKEN'] = auth_token('admin')
142     os.environ['ARVADOS_API_HOST_INSECURE'] = 'true'
143
144     pid_file = os.path.join(SERVICES_SRC_DIR, 'api', SERVER_PID_PATH)
145     pid_file_ok = find_server_pid(pid_file, 0)
146
147     existing_api_host = os.environ.get('ARVADOS_TEST_API_HOST', my_api_host)
148     if existing_api_host and pid_file_ok:
149         try:
150             os.environ['ARVADOS_API_HOST'] = existing_api_host
151             reset()
152             return
153         except:
154             pass
155
156     restore_cwd = os.getcwd()
157     api_src_dir = os.path.join(SERVICES_SRC_DIR, 'api')
158     os.chdir(api_src_dir)
159
160     # Either we haven't started a server of our own yet, or it has
161     # died, or we have lost our credentials, or something else is
162     # preventing us from calling reset(). Start a new one.
163
164     if not os.path.exists('tmp/self-signed.pem'):
165         # We assume here that either passenger reports its listening
166         # address as https:/0.0.0.0:port/. If it reports "127.0.0.1"
167         # then the certificate won't match the host and reset() will
168         # fail certificate verification. If it reports "localhost",
169         # clients (notably Python SDK's websocket client) might
170         # resolve localhost as ::1 and then fail to connect.
171         subprocess.check_call([
172             'openssl', 'req', '-new', '-x509', '-nodes',
173             '-out', 'tmp/self-signed.pem',
174             '-keyout', 'tmp/self-signed.key',
175             '-days', '3650',
176             '-subj', '/CN=0.0.0.0'])
177
178     port = find_available_port()
179     env = os.environ.copy()
180     env['RAILS_ENV'] = 'test'
181     env['ARVADOS_WEBSOCKETS'] = 'yes'
182     env.pop('ARVADOS_TEST_API_HOST', None)
183     env.pop('ARVADOS_API_HOST', None)
184     env.pop('ARVADOS_API_HOST_INSECURE', None)
185     env.pop('ARVADOS_API_TOKEN', None)
186     start_msg = subprocess.check_output(
187         ['bundle', 'exec',
188          'passenger', 'start', '-d', '-p{}'.format(port),
189          '--pid-file', os.path.join(os.getcwd(), pid_file),
190          '--log-file', os.path.join(os.getcwd(), 'log/test.log'),
191          '--ssl',
192          '--ssl-certificate', 'tmp/self-signed.pem',
193          '--ssl-certificate-key', 'tmp/self-signed.key'],
194         env=env)
195
196     if not leave_running_atexit:
197         atexit.register(kill_server_pid, pid_file, passenger_root=api_src_dir)
198
199     match = re.search(r'Accessible via: https://(.*?)/', start_msg)
200     if not match:
201         raise Exception(
202             "Passenger did not report endpoint: {}".format(start_msg))
203     my_api_host = match.group(1)
204     os.environ['ARVADOS_API_HOST'] = my_api_host
205
206     # Make sure the server has written its pid file before continuing
207     find_server_pid(pid_file)
208
209     reset()
210     os.chdir(restore_cwd)
211
212 def reset():
213     """Reset the test server to fixture state.
214
215     This resets the ARVADOS_TEST_API_HOST provided by a parent process
216     if any, otherwise the server started by run().
217     """
218     existing_api_host = os.environ.get('ARVADOS_TEST_API_HOST', my_api_host)
219     token = auth_token('admin')
220     httpclient = httplib2.Http(ca_certs=os.path.join(
221         SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem'))
222     httpclient.request(
223         'https://{}/database/reset'.format(existing_api_host),
224         'POST',
225         headers={'Authorization': 'OAuth2 {}'.format(token)})
226
227 def stop(force=False):
228     """Stop the API server, if one is running.
229
230     If force==False, kill it only if we started it ourselves. (This
231     supports the use case where a Python test suite calls run(), but
232     run() just uses the ARVADOS_TEST_API_HOST provided by the parent
233     process, and the test suite cleans up after itself by calling
234     stop(). In this case the test server provided by the parent
235     process should be left alone.)
236
237     If force==True, kill it even if we didn't start it
238     ourselves. (This supports the use case in __main__, where "run"
239     and "stop" happen in different processes.)
240     """
241     global my_api_host
242     if force or my_api_host is not None:
243         kill_server_pid(os.path.join(SERVICES_SRC_DIR, 'api', SERVER_PID_PATH))
244         my_api_host = None
245
246 def _start_keep(n, keep_args):
247     keep0 = tempfile.mkdtemp()
248     port = find_available_port()
249     keep_cmd = ["keepstore",
250                 "-volumes={}".format(keep0),
251                 "-listen=:{}".format(port),
252                 "-pid={}".format("{}/keep{}.pid".format(TEST_TMPDIR, n))]
253
254     for arg, val in keep_args.iteritems():
255         keep_cmd.append("{}={}".format(arg, val))
256
257     kp0 = subprocess.Popen(keep_cmd)
258     with open("{}/keep{}.pid".format(TEST_TMPDIR, n), 'w') as f:
259         f.write(str(kp0.pid))
260
261     with open("{}/keep{}.volume".format(TEST_TMPDIR, n), 'w') as f:
262         f.write(keep0)
263
264     return port
265
266 def run_keep(blob_signing_key=None, enforce_permissions=False):
267     stop_keep()
268
269     keep_args = {}
270     if blob_signing_key:
271         with open(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"), "w") as f:
272             keep_args['--permission-key-file'] = f.name
273             f.write(blob_signing_key)
274     if enforce_permissions:
275         keep_args['--enforce-permissions'] = 'true'
276
277     api = arvados.api(
278         'v1', cache=False,
279         host=os.environ['ARVADOS_API_HOST'],
280         token=os.environ['ARVADOS_API_TOKEN'],
281         insecure=True)
282     for d in api.keep_services().list().execute()['items']:
283         api.keep_services().delete(uuid=d['uuid']).execute()
284     for d in api.keep_disks().list().execute()['items']:
285         api.keep_disks().delete(uuid=d['uuid']).execute()
286
287     for d in range(0, 2):
288         port = _start_keep(d, keep_args)
289         svc = api.keep_services().create(body={'keep_service': {
290             'uuid': 'zzzzz-bi6l4-keepdisk{:07d}'.format(d),
291             'service_host': 'localhost',
292             'service_port': port,
293             'service_type': 'disk',
294             'service_ssl_flag': False,
295         }}).execute()
296         api.keep_disks().create(body={
297             'keep_disk': {'keep_service_uuid': svc['uuid'] }
298         }).execute()
299
300 def _stop_keep(n):
301     kill_server_pid("{}/keep{}.pid".format(TEST_TMPDIR, n), 0)
302     if os.path.exists("{}/keep{}.volume".format(TEST_TMPDIR, n)):
303         with open("{}/keep{}.volume".format(TEST_TMPDIR, n), 'r') as r:
304             shutil.rmtree(r.read(), True)
305         os.unlink("{}/keep{}.volume".format(TEST_TMPDIR, n))
306     if os.path.exists(os.path.join(TEST_TMPDIR, "keep.blob_signing_key")):
307         os.remove(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"))
308
309 def stop_keep():
310     _stop_keep(0)
311     _stop_keep(1)
312
313 def run_keep_proxy():
314     stop_keep_proxy()
315
316     admin_token = auth_token('admin')
317     port = find_available_port()
318     env = os.environ.copy()
319     env['ARVADOS_API_TOKEN'] = admin_token
320     kp = subprocess.Popen(
321         ['keepproxy',
322          '-pid={}/keepproxy.pid'.format(TEST_TMPDIR),
323          '-listen=:{}'.format(port)],
324         env=env)
325
326     api = arvados.api(
327         'v1', cache=False,
328         host=os.environ['ARVADOS_API_HOST'],
329         token=admin_token,
330         insecure=True)
331     for d in api.keep_services().list(
332             filters=[['service_type','=','proxy']]).execute()['items']:
333         api.keep_services().delete(uuid=d['uuid']).execute()
334     api.keep_services().create(body={'keep_service': {
335         'service_host': 'localhost',
336         'service_port': port,
337         'service_type': 'proxy',
338         'service_ssl_flag': False,
339     }}).execute()
340     os.environ["ARVADOS_KEEP_PROXY"] = "http://localhost:{}".format(port)
341
342 def stop_keep_proxy():
343     kill_server_pid(os.path.join(TEST_TMPDIR, "keepproxy.pid"), 0)
344
345 def fixture(fix):
346     '''load a fixture yaml file'''
347     with open(os.path.join(SERVICES_SRC_DIR, 'api', "test", "fixtures",
348                            fix + ".yml")) as f:
349         yaml_file = f.read()
350         try:
351           trim_index = yaml_file.index("# Test Helper trims the rest of the file")
352           yaml_file = yaml_file[0:trim_index]
353         except ValueError:
354           pass
355         return yaml.load(yaml_file)
356
357 def auth_token(token_name):
358     return fixture("api_client_authorizations")[token_name]["api_token"]
359
360 def authorize_with(token_name):
361     '''token_name is the symbolic name of the token from the api_client_authorizations fixture'''
362     arvados.config.settings()["ARVADOS_API_TOKEN"] = auth_token(token_name)
363     arvados.config.settings()["ARVADOS_API_HOST"] = os.environ.get("ARVADOS_API_HOST")
364     arvados.config.settings()["ARVADOS_API_HOST_INSECURE"] = "true"
365
366 class TestCaseWithServers(unittest.TestCase):
367     """TestCase to start and stop supporting Arvados servers.
368
369     Define any of MAIN_SERVER, KEEP_SERVER, and/or KEEP_PROXY_SERVER
370     class variables as a dictionary of keyword arguments.  If you do,
371     setUpClass will start the corresponding servers by passing these
372     keyword arguments to the run, run_keep, and/or run_keep_server
373     functions, respectively.  It will also set Arvados environment
374     variables to point to these servers appropriately.  If you don't
375     run a Keep or Keep proxy server, setUpClass will set up a
376     temporary directory for Keep local storage, and set it as
377     KEEP_LOCAL_STORE.
378
379     tearDownClass will stop any servers started, and restore the
380     original environment.
381     """
382     MAIN_SERVER = None
383     KEEP_SERVER = None
384     KEEP_PROXY_SERVER = None
385
386     @staticmethod
387     def _restore_dict(src, dest):
388         for key in dest.keys():
389             if key not in src:
390                 del dest[key]
391         dest.update(src)
392
393     @classmethod
394     def setUpClass(cls):
395         cls._orig_environ = os.environ.copy()
396         cls._orig_config = arvados.config.settings().copy()
397         cls._cleanup_funcs = []
398         os.environ.pop('ARVADOS_KEEP_PROXY', None)
399         os.environ.pop('ARVADOS_EXTERNAL_CLIENT', None)
400         for server_kwargs, start_func, stop_func in (
401                 (cls.MAIN_SERVER, run, reset),
402                 (cls.KEEP_SERVER, run_keep, stop_keep),
403                 (cls.KEEP_PROXY_SERVER, run_keep_proxy, stop_keep_proxy)):
404             if server_kwargs is not None:
405                 start_func(**server_kwargs)
406                 cls._cleanup_funcs.append(stop_func)
407         if (cls.KEEP_SERVER is None) and (cls.KEEP_PROXY_SERVER is None):
408             cls.local_store = tempfile.mkdtemp()
409             os.environ['KEEP_LOCAL_STORE'] = cls.local_store
410             cls._cleanup_funcs.append(
411                 lambda: shutil.rmtree(cls.local_store, ignore_errors=True))
412         else:
413             os.environ.pop('KEEP_LOCAL_STORE', None)
414         arvados.config.initialize()
415
416     @classmethod
417     def tearDownClass(cls):
418         for clean_func in cls._cleanup_funcs:
419             clean_func()
420         cls._restore_dict(cls._orig_environ, os.environ)
421         cls._restore_dict(cls._orig_config, arvados.config.settings())
422
423
424 if __name__ == "__main__":
425     actions = ['start', 'stop',
426                'start_keep', 'stop_keep',
427                'start_keep_proxy', 'stop_keep_proxy']
428     parser = argparse.ArgumentParser()
429     parser.add_argument('action', type=str, help="one of {}".format(actions))
430     parser.add_argument('--auth', type=str, metavar='FIXTURE_NAME', help='Print authorization info for given api_client_authorizations fixture')
431     args = parser.parse_args()
432
433     if args.action == 'start':
434         stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
435         run(leave_running_atexit=True)
436         host = os.environ['ARVADOS_API_HOST']
437         if args.auth is not None:
438             token = auth_token(args.auth)
439             print("export ARVADOS_API_TOKEN={}".format(pipes.quote(token)))
440             print("export ARVADOS_API_HOST={}".format(pipes.quote(host)))
441             print("export ARVADOS_API_HOST_INSECURE=true")
442         else:
443             print(host)
444     elif args.action == 'stop':
445         stop(force=('ARVADOS_TEST_API_HOST' not in os.environ))
446     elif args.action == 'start_keep':
447         run_keep()
448     elif args.action == 'stop_keep':
449         stop_keep()
450     elif args.action == 'start_keep_proxy':
451         run_keep_proxy()
452     elif args.action == 'stop_keep_proxy':
453         stop_keep_proxy()
454     else:
455         print("Unrecognized action '{}'. Actions are: {}.".format(args.action, actions))