6934: Improve and test logging
[arvados.git] / sdk / pam / tests / test_pam.py
1 #!/usr/bin/env python
2
3 import arvados
4 import arvados_pam
5 import mock
6 import re
7 import StringIO
8 import unittest
9
10 class ConfigTest(unittest.TestCase):
11     def test_ok_config(self):
12         self.assertConfig(
13             """servicename:
14               ARVADOS_API_HOST: xyzzy.example
15               virtual_machine_hostname: foo.shell
16             """, 'servicename', 'xyzzy.example', 'foo.shell')
17
18     @mock.patch('arvados_pam.config_file')
19     def assertConfig(self, txt, svcname, apihost, shellhost, config_file):
20         configfake = StringIO.StringIO(txt)
21         config_file.side_effect = [configfake]
22         c = arvados_pam.config()
23         self.assertEqual(apihost, c[svcname]['ARVADOS_API_HOST'])
24         self.assertEqual(shellhost, c[svcname]['virtual_machine_hostname'])
25
26 class AuthTest(unittest.TestCase):
27
28     default_config = {
29         'test_service': {
30             'ARVADOS_API_HOST': 'zzzzz.api_host.example',
31             'virtual_machine_hostname': 'testvm2.shell',
32         }
33     }
34
35     default_request = {
36         'client_host': '::1',
37         'token': '3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi',
38         'username': 'active',
39     }
40
41     default_response = {
42         'links': lambda: {
43             'items': [{
44                 'uuid': 'zzzzz-o0j2j-rah2ya1ohx9xaev',
45                 'tail_uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
46                 'head_uuid': 'zzzzz-2x53u-382brsig8rp3065',
47                 'link_class': 'permission',
48                 'name': 'can_login',
49                 'properties': {
50                     'username': 'active',
51                 },
52             }],
53         },
54         'users': lambda: {
55             'uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
56             'full_name': 'Active User',
57         },
58         'virtual_machines': lambda: {
59             'items': [{
60                 'uuid': 'zzzzz-2x53u-382brsig8rp3065',
61                 'hostname': 'testvm2.shell',
62             }],
63             'items_available': 1,
64         },
65     }
66
67     def attempt(self):
68         return arvados_pam.AuthEvent(config=self.config, service='test_service', **self.request).can_login()
69
70     def test_success(self):
71         self.assertTrue(self.attempt())
72
73         cfg = self.config['test_service']
74         self.api_client.virtual_machines().list.assert_called_with(
75             filters=[['hostname','=',cfg['virtual_machine_hostname']]])
76         self.api.assert_called_with(
77             'v1', host=cfg['ARVADOS_API_HOST'], token=self.request['token'], cache=None)
78         self.assertEqual(1, len(self.syslogged))
79         for i in ['test_service',
80                   self.request['username'],
81                   self.config['test_service']['ARVADOS_API_HOST'],
82                   self.response['virtual_machines']()['items'][0]['uuid']]:
83             self.assertRegexpMatches(self.syslogged[0], re.escape(i))
84         self.assertRegexpMatches(self.syslogged[0], re.escape(self.request['token'][0:15]), 'token prefix not logged')
85         self.assertNotRegexpMatches(self.syslogged[0], re.escape(self.request['token'][15:30]), 'too much token logged')
86
87     def test_fail_vm_lookup(self):
88         self.response['virtual_machines'] = self._raise
89         self.assertFalse(self.attempt())
90         self.assertRegexpMatches(self.syslogged[0], 'Test-induced failure')
91
92     def test_vm_hostname_not_found(self):
93         self.response['virtual_machines'] = lambda: {
94             'items': [],
95             'items_available': 0,
96         }
97         self.assertFalse(self.attempt())
98
99     def test_vm_hostname_ambiguous(self):
100         self.response['virtual_machines'] = lambda: {
101             'items': [
102                 {
103                     'uuid': 'zzzzz-2x53u-382brsig8rp3065',
104                     'hostname': 'testvm2.shell',
105                 },
106                 {
107                     'uuid': 'zzzzz-2x53u-382brsig8rp3065',
108                     'hostname': 'testvm2.shell',
109                 },
110             ],
111             'items_available': 2,
112         }
113         self.assertFalse(self.attempt())
114
115     def test_server_ignores_vm_filters(self):
116         self.response['virtual_machines'] = lambda: {
117             'items': [
118                 {
119                     'uuid': 'zzzzz-2x53u-382brsig8rp3065',
120                     'hostname': 'testvm22.shell', # <-----
121                 },
122             ],
123             'items_available': 1,
124         }
125         self.assertFalse(self.attempt())
126
127     def test_fail_user_lookup(self):
128         self.response['users'] = self._raise
129         self.assertFalse(self.attempt())
130
131     def test_fail_permission_check(self):
132         self.response['links'] = self._raise
133         self.assertFalse(self.attempt())
134
135     def test_no_login_permission(self):
136         self.response['links'] = lambda: {
137             'items': [],
138         }
139         self.assertFalse(self.attempt())
140
141     def test_server_ignores_permission_filters(self):
142         self.response['links'] = lambda: {
143             'items': [{
144                 'uuid': 'zzzzz-o0j2j-rah2ya1ohx9xaev',
145                 'tail_uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
146                 'head_uuid': 'zzzzz-2x53u-382brsig8rp3065',
147                 'link_class': 'permission',
148                 'name': 'CANT_login', # <-----
149                 'properties': {
150                     'username': 'active',
151                 },
152             }],
153         }
154         self.assertFalse(self.attempt())
155
156     def setUp(self):
157         self.config = self.default_config.copy()
158         self.request = self.default_request.copy()
159         self.response = self.default_response.copy()
160         self.api_client = mock.MagicMock(name='api_client')
161         self.api_client.users().current().execute.side_effect = lambda: self.response['users']()
162         self.api_client.virtual_machines().list().execute.side_effect = lambda: self.response['virtual_machines']()
163         self.api_client.links().list().execute.side_effect = lambda: self.response['links']()
164         patcher = mock.patch('arvados.api')
165         self.api = patcher.start()
166         self.addCleanup(patcher.stop)
167         self.api.side_effect = [self.api_client]
168
169         self.syslogged = []
170         patcher = mock.patch('syslog.syslog')
171         self.syslog = patcher.start()
172         self.addCleanup(patcher.stop)
173         self.syslog.side_effect = lambda s: self.syslogged.append(s)
174
175     def _raise(self, exception=Exception("Test-induced failure"), *args, **kwargs):
176         raise exception