1eb436a2c452ac5bd7c8ac7f4d4a1ce546fffe40
[arvados.git] / sdk / pam / tests / test_pam.py
1 #!/usr/bin/env python
2
3 import arvados
4 import arvados_pam
5 import mock
6 import StringIO
7 import unittest
8
9 class ConfigTest(unittest.TestCase):
10     def test_ok_config(self):
11         self.assertConfig(
12             """servicename:
13               ARVADOS_API_HOST: xyzzy.example
14               virtual_machine_hostname: foo.shell
15             """, 'servicename', 'xyzzy.example', 'foo.shell')
16
17     @mock.patch('arvados_pam.config_file')
18     def assertConfig(self, txt, svcname, apihost, shellhost, config_file):
19         configfake = StringIO.StringIO(txt)
20         config_file.side_effect = [configfake]
21         c = arvados_pam.config()
22         self.assertEqual(apihost, c[svcname]['ARVADOS_API_HOST'])
23         self.assertEqual(shellhost, c[svcname]['virtual_machine_hostname'])
24
25 class AuthTest(unittest.TestCase):
26
27     default_config = {
28         'test_service': {
29             'ARVADOS_API_HOST': 'zzzzz.api_host.example',
30             'virtual_machine_hostname': 'testvm2.shell',
31         }
32     }
33
34     default_request = {
35         'client_host': '::1',
36         'token': '3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi',
37         'username': 'active',
38     }
39
40     default_response = {
41         'links': lambda: {
42             'items': [{
43                 'uuid': 'zzzzz-o0j2j-rah2ya1ohx9xaev',
44                 'tail_uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
45                 'head_uuid': 'zzzzz-2x53u-382brsig8rp3065',
46                 'link_class': 'permission',
47                 'name': 'can_login',
48                 'properties': {
49                     'username': 'active',
50                 },
51             }],
52         },
53         'users': lambda: {
54             'uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
55             'full_name': 'Active User',
56         },
57         'virtual_machines': lambda: {
58             'items': [{
59                 'uuid': 'zzzzz-2x53u-382brsig8rp3065',
60                 'hostname': 'testvm2.shell',
61             }],
62             'items_available': 1,
63         },
64     }
65
66     def attempt(self):
67         return arvados_pam.AuthEvent(config=self.config, service='test_service', **self.request).can_login()
68
69     def test_success(self):
70         self.assertTrue(self.attempt())
71
72         cfg = self.config['test_service']
73         self.api_client.virtual_machines().list.assert_called_with(
74             filters=[['hostname','=',cfg['virtual_machine_hostname']]])
75         self.api.assert_called_with(
76             'v1', host=cfg['ARVADOS_API_HOST'], token=self.request['token'], cache=None)
77
78     def test_fail_vm_lookup(self):
79         self.response['virtual_machines'] = self._raise
80         self.assertFalse(self.attempt())
81
82     def test_vm_hostname_not_found(self):
83         self.response['virtual_machines'] = lambda: {
84             'items': [],
85             'items_available': 0,
86         }
87         self.assertFalse(self.attempt())
88
89     def test_vm_hostname_ambiguous(self):
90         self.response['virtual_machines'] = lambda: {
91             'items': [
92                 {
93                     'uuid': 'zzzzz-2x53u-382brsig8rp3065',
94                     'hostname': 'testvm2.shell',
95                 },
96                 {
97                     'uuid': 'zzzzz-2x53u-382brsig8rp3065',
98                     'hostname': 'testvm2.shell',
99                 },
100             ],
101             'items_available': 2,
102         }
103         self.assertFalse(self.attempt())
104
105     def test_server_ignores_vm_filters(self):
106         self.response['virtual_machines'] = lambda: {
107             'items': [
108                 {
109                     'uuid': 'zzzzz-2x53u-382brsig8rp3065',
110                     'hostname': 'testvm22.shell', # <-----
111                 },
112             ],
113             'items_available': 1,
114         }
115         self.assertFalse(self.attempt())
116
117     def test_fail_user_lookup(self):
118         self.response['users'] = self._raise
119         self.assertFalse(self.attempt())
120
121     def test_fail_permission_check(self):
122         self.response['links'] = self._raise
123         self.assertFalse(self.attempt())
124
125     def test_no_login_permission(self):
126         self.response['links'] = lambda: {
127             'items': [],
128         }
129         self.assertFalse(self.attempt())
130
131     def test_server_ignores_permission_filters(self):
132         self.response['links'] = lambda: {
133             'items': [{
134                 'uuid': 'zzzzz-o0j2j-rah2ya1ohx9xaev',
135                 'tail_uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
136                 'head_uuid': 'zzzzz-2x53u-382brsig8rp3065',
137                 'link_class': 'permission',
138                 'name': 'CANT_login', # <-----
139                 'properties': {
140                     'username': 'active',
141                 },
142             }],
143         }
144         self.assertFalse(self.attempt())
145
146     def setUp(self):
147         self.config = self.default_config.copy()
148         self.request = self.default_request.copy()
149         self.response = self.default_response.copy()
150         self.api_client = mock.MagicMock(name='api_client')
151         self.api_client.users().current().execute.side_effect = lambda: self.response['users']()
152         self.api_client.virtual_machines().list().execute.side_effect = lambda: self.response['virtual_machines']()
153         self.api_client.links().list().execute.side_effect = lambda: self.response['links']()
154         patcher = mock.patch('arvados.api')
155         self.api = patcher.start()
156         self.addCleanup(patcher.stop)
157         self.api.side_effect = [self.api_client]
158
159     def _raise(self, exception=Exception("Test-induced failure"), *args, **kwargs):
160         raise exception