6934: Add arvados_pam package.
[arvados.git] / sdk / pam / tests / test_pam.py
1 #!/usr/bin/env python
2
3 import arvados
4 import arvados_pam
5 import mock
6 import StringIO
7 import unittest
8
9 class ConfigTest(unittest.TestCase):
10     def test_ok_config(self):
11         self.assertConfig(
12             "#comment\nARVADOS_API_HOST=xyzzy.example\nHOSTNAME=foo.shell\n#HOSTNAME=bogus\n",
13             'xyzzy.example',
14             'foo.shell')
15
16     def test_config_missing_apihost(self):
17         with self.assertRaises(KeyError):
18             self.assertConfig('HOSTNAME=foo', '', 'foo')
19
20     def test_config_missing_shellhost(self):
21         with self.assertRaises(KeyError):
22             self.assertConfig('ARVADOS_API_HOST=foo', 'foo', '')
23
24     def test_config_empty_shellhost(self):
25         self.assertConfig("ARVADOS_API_HOST=foo\nHOSTNAME=\n", 'foo', '')
26
27     def test_config_strip_whitespace(self):
28         self.assertConfig(" ARVADOS_API_HOST = foo \n\tHOSTNAME\t=\tbar\t\n", 'foo', 'bar')
29
30     @mock.patch('arvados_pam.config_file')
31     def assertConfig(self, txt, apihost, shellhost, config_file):
32         configfake = StringIO.StringIO(txt)
33         config_file.side_effect = [configfake]
34         c = arvados_pam.config()
35         self.assertEqual(apihost, c['ARVADOS_API_HOST'])
36         self.assertEqual(shellhost, c['HOSTNAME'])
37
38 class AuthTest(unittest.TestCase):
39
40     default_request = {
41         'api_host': 'zzzzz.api_host.example',
42         'shell_host': 'testvm2.shell',
43         'token': '3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi',
44         'username': 'active',
45     }
46
47     default_response = {
48         'links': lambda: {
49             'items': [{
50                 'uuid': 'zzzzz-o0j2j-rah2ya1ohx9xaev',
51                 'tail_uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
52                 'head_uuid': 'zzzzz-2x53u-382brsig8rp3065',
53                 'link_class': 'permission',
54                 'name': 'can_login',
55                 'properties': {
56                     'username': 'active',
57                 },
58             }],
59         },
60         'users': lambda: {
61             'uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
62             'full_name': 'Active User',
63         },
64         'virtual_machines': lambda: {
65             'items': [{
66                 'uuid': 'zzzzz-2x53u-382brsig8rp3065',
67                 'hostname': 'testvm2.shell',
68             }],
69             'items_available': 1,
70         },
71     }
72
73     def attempt(self):
74         return arvados_pam.AuthEvent('::1', **self.request).can_login()
75
76     def test_success(self):
77         self.assertTrue(self.attempt())
78         self.api_client.virtual_machines().list.assert_called_with(
79             filters=[['hostname','=',self.request['shell_host']]])
80         self.api.assert_called_with(
81             'v1', host=self.request['api_host'], token=self.request['token'], cache=None)
82
83     def test_fail_vm_lookup(self):
84         self.response['virtual_machines'] = self._raise
85         self.assertFalse(self.attempt())
86
87     def test_vm_hostname_not_found(self):
88         self.response['virtual_machines'] = lambda: {
89             'items': [],
90             'items_available': 0,
91         }
92         self.assertFalse(self.attempt())
93
94     def test_vm_hostname_ambiguous(self):
95         self.response['virtual_machines'] = lambda: {
96             'items': [
97                 {
98                     'uuid': 'zzzzz-2x53u-382brsig8rp3065',
99                     'hostname': 'testvm2.shell',
100                 },
101                 {
102                     'uuid': 'zzzzz-2x53u-382brsig8rp3065',
103                     'hostname': 'testvm2.shell',
104                 },
105             ],
106             'items_available': 2,
107         }
108         self.assertFalse(self.attempt())
109
110     def test_server_ignores_vm_filters(self):
111         self.response['virtual_machines'] = lambda: {
112             'items': [
113                 {
114                     'uuid': 'zzzzz-2x53u-382brsig8rp3065',
115                     'hostname': 'testvm22.shell', # <-----
116                 },
117             ],
118             'items_available': 1,
119         }
120         self.assertFalse(self.attempt())
121
122     def test_fail_user_lookup(self):
123         self.response['users'] = self._raise
124         self.assertFalse(self.attempt())
125
126     def test_fail_permission_check(self):
127         self.response['links'] = self._raise
128         self.assertFalse(self.attempt())
129
130     def test_no_login_permission(self):
131         self.response['links'] = lambda: {
132             'items': [],
133         }
134         self.assertFalse(self.attempt())
135
136     def test_server_ignores_permission_filters(self):
137         self.response['links'] = lambda: {
138             'items': [{
139                 'uuid': 'zzzzz-o0j2j-rah2ya1ohx9xaev',
140                 'tail_uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
141                 'head_uuid': 'zzzzz-2x53u-382brsig8rp3065',
142                 'link_class': 'permission',
143                 'name': 'CANT_login', # <-----
144                 'properties': {
145                     'username': 'active',
146                 },
147             }],
148         }
149         self.assertFalse(self.attempt())
150
151     def setUp(self):
152         self.request = self.default_request.copy()
153         self.response = self.default_response.copy()
154         self.api_client = mock.MagicMock(name='api_client')
155         self.api_client.users().current().execute.side_effect = lambda: self.response['users']()
156         self.api_client.virtual_machines().list().execute.side_effect = lambda: self.response['virtual_machines']()
157         self.api_client.links().list().execute.side_effect = lambda: self.response['links']()
158         patcher = mock.patch('arvados.api')
159         self.api = patcher.start()
160         self.addCleanup(patcher.stop)
161         self.api.side_effect = [self.api_client]
162
163     def _raise(self, exception=Exception("Test-induced failure"), *args, **kwargs):
164         raise exception