10 class ConfigTest(unittest.TestCase):
11 def test_ok_config(self):
14 ARVADOS_API_HOST: xyzzy.example
15 virtual_machine_hostname: foo.shell
16 """, 'servicename', 'xyzzy.example', 'foo.shell')
18 @mock.patch('arvados_pam.config_file')
19 def assertConfig(self, txt, svcname, apihost, shellhost, config_file):
20 configfake = StringIO.StringIO(txt)
21 config_file.side_effect = [configfake]
22 c = arvados_pam.config()
23 self.assertEqual(apihost, c[svcname]['ARVADOS_API_HOST'])
24 self.assertEqual(shellhost, c[svcname]['virtual_machine_hostname'])
26 class AuthTest(unittest.TestCase):
30 'ARVADOS_API_HOST': 'zzzzz.api_host.example',
31 'virtual_machine_hostname': 'testvm2.shell',
37 'token': '3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi',
44 'uuid': 'zzzzz-o0j2j-rah2ya1ohx9xaev',
45 'tail_uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
46 'head_uuid': 'zzzzz-2x53u-382brsig8rp3065',
47 'link_class': 'permission',
55 'uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
56 'full_name': 'Active User',
58 'virtual_machines': lambda: {
60 'uuid': 'zzzzz-2x53u-382brsig8rp3065',
61 'hostname': 'testvm2.shell',
68 return arvados_pam.AuthEvent(config=self.config, service='test_service', **self.request).can_login()
70 def test_success(self):
71 self.assertTrue(self.attempt())
73 cfg = self.config['test_service']
74 self.api_client.virtual_machines().list.assert_called_with(
75 filters=[['hostname','=',cfg['virtual_machine_hostname']]])
76 self.api.assert_called_with(
77 'v1', host=cfg['ARVADOS_API_HOST'], token=self.request['token'], cache=None)
78 self.assertEqual(1, len(self.syslogged))
79 for i in ['test_service',
80 self.request['username'],
81 self.config['test_service']['ARVADOS_API_HOST'],
82 self.response['virtual_machines']()['items'][0]['uuid']]:
83 self.assertRegexpMatches(self.syslogged[0], re.escape(i))
84 self.assertRegexpMatches(self.syslogged[0], re.escape(self.request['token'][0:15]), 'token prefix not logged')
85 self.assertNotRegexpMatches(self.syslogged[0], re.escape(self.request['token'][15:30]), 'too much token logged')
87 def test_fail_vm_lookup(self):
88 self.response['virtual_machines'] = self._raise
89 self.assertFalse(self.attempt())
90 self.assertRegexpMatches(self.syslogged[0], 'Test-induced failure')
92 def test_vm_hostname_not_found(self):
93 self.response['virtual_machines'] = lambda: {
97 self.assertFalse(self.attempt())
99 def test_vm_hostname_ambiguous(self):
100 self.response['virtual_machines'] = lambda: {
103 'uuid': 'zzzzz-2x53u-382brsig8rp3065',
104 'hostname': 'testvm2.shell',
107 'uuid': 'zzzzz-2x53u-382brsig8rp3065',
108 'hostname': 'testvm2.shell',
111 'items_available': 2,
113 self.assertFalse(self.attempt())
115 def test_server_ignores_vm_filters(self):
116 self.response['virtual_machines'] = lambda: {
119 'uuid': 'zzzzz-2x53u-382brsig8rp3065',
120 'hostname': 'testvm22.shell', # <-----
123 'items_available': 1,
125 self.assertFalse(self.attempt())
127 def test_fail_user_lookup(self):
128 self.response['users'] = self._raise
129 self.assertFalse(self.attempt())
131 def test_fail_permission_check(self):
132 self.response['links'] = self._raise
133 self.assertFalse(self.attempt())
135 def test_no_login_permission(self):
136 self.response['links'] = lambda: {
139 self.assertFalse(self.attempt())
141 def test_server_ignores_permission_filters(self):
142 self.response['links'] = lambda: {
144 'uuid': 'zzzzz-o0j2j-rah2ya1ohx9xaev',
145 'tail_uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
146 'head_uuid': 'zzzzz-2x53u-382brsig8rp3065',
147 'link_class': 'permission',
148 'name': 'CANT_login', # <-----
150 'username': 'active',
154 self.assertFalse(self.attempt())
157 self.config = self.default_config.copy()
158 self.request = self.default_request.copy()
159 self.response = self.default_response.copy()
160 self.api_client = mock.MagicMock(name='api_client')
161 self.api_client.users().current().execute.side_effect = lambda: self.response['users']()
162 self.api_client.virtual_machines().list().execute.side_effect = lambda: self.response['virtual_machines']()
163 self.api_client.links().list().execute.side_effect = lambda: self.response['links']()
164 patcher = mock.patch('arvados.api')
165 self.api = patcher.start()
166 self.addCleanup(patcher.stop)
167 self.api.side_effect = [self.api_client]
170 patcher = mock.patch('syslog.syslog')
171 self.syslog = patcher.start()
172 self.addCleanup(patcher.stop)
173 self.syslog.side_effect = lambda s: self.syslogged.append(s)
175 def _raise(self, exception=Exception("Test-induced failure"), *args, **kwargs):