6934: Add arvados_pam package.
[arvados.git] / sdk / pam / tests / test_pam.py
diff --git a/sdk/pam/tests/test_pam.py b/sdk/pam/tests/test_pam.py
new file mode 100644 (file)
index 0000000..f59a4fd
--- /dev/null
@@ -0,0 +1,164 @@
+#!/usr/bin/env python
+
+import arvados
+import arvados_pam
+import mock
+import StringIO
+import unittest
+
+class ConfigTest(unittest.TestCase):
+    def test_ok_config(self):
+        self.assertConfig(
+            "#comment\nARVADOS_API_HOST=xyzzy.example\nHOSTNAME=foo.shell\n#HOSTNAME=bogus\n",
+            'xyzzy.example',
+            'foo.shell')
+
+    def test_config_missing_apihost(self):
+        with self.assertRaises(KeyError):
+            self.assertConfig('HOSTNAME=foo', '', 'foo')
+
+    def test_config_missing_shellhost(self):
+        with self.assertRaises(KeyError):
+            self.assertConfig('ARVADOS_API_HOST=foo', 'foo', '')
+
+    def test_config_empty_shellhost(self):
+        self.assertConfig("ARVADOS_API_HOST=foo\nHOSTNAME=\n", 'foo', '')
+
+    def test_config_strip_whitespace(self):
+        self.assertConfig(" ARVADOS_API_HOST = foo \n\tHOSTNAME\t=\tbar\t\n", 'foo', 'bar')
+
+    @mock.patch('arvados_pam.config_file')
+    def assertConfig(self, txt, apihost, shellhost, config_file):
+        configfake = StringIO.StringIO(txt)
+        config_file.side_effect = [configfake]
+        c = arvados_pam.config()
+        self.assertEqual(apihost, c['ARVADOS_API_HOST'])
+        self.assertEqual(shellhost, c['HOSTNAME'])
+
+class AuthTest(unittest.TestCase):
+
+    default_request = {
+        'api_host': 'zzzzz.api_host.example',
+        'shell_host': 'testvm2.shell',
+        'token': '3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi',
+        'username': 'active',
+    }
+
+    default_response = {
+        'links': lambda: {
+            'items': [{
+                'uuid': 'zzzzz-o0j2j-rah2ya1ohx9xaev',
+                'tail_uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
+                'head_uuid': 'zzzzz-2x53u-382brsig8rp3065',
+                'link_class': 'permission',
+                'name': 'can_login',
+                'properties': {
+                    'username': 'active',
+                },
+            }],
+        },
+        'users': lambda: {
+            'uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
+            'full_name': 'Active User',
+        },
+        'virtual_machines': lambda: {
+            'items': [{
+                'uuid': 'zzzzz-2x53u-382brsig8rp3065',
+                'hostname': 'testvm2.shell',
+            }],
+            'items_available': 1,
+        },
+    }
+
+    def attempt(self):
+        return arvados_pam.AuthEvent('::1', **self.request).can_login()
+
+    def test_success(self):
+        self.assertTrue(self.attempt())
+        self.api_client.virtual_machines().list.assert_called_with(
+            filters=[['hostname','=',self.request['shell_host']]])
+        self.api.assert_called_with(
+            'v1', host=self.request['api_host'], token=self.request['token'], cache=None)
+
+    def test_fail_vm_lookup(self):
+        self.response['virtual_machines'] = self._raise
+        self.assertFalse(self.attempt())
+
+    def test_vm_hostname_not_found(self):
+        self.response['virtual_machines'] = lambda: {
+            'items': [],
+            'items_available': 0,
+        }
+        self.assertFalse(self.attempt())
+
+    def test_vm_hostname_ambiguous(self):
+        self.response['virtual_machines'] = lambda: {
+            'items': [
+                {
+                    'uuid': 'zzzzz-2x53u-382brsig8rp3065',
+                    'hostname': 'testvm2.shell',
+                },
+                {
+                    'uuid': 'zzzzz-2x53u-382brsig8rp3065',
+                    'hostname': 'testvm2.shell',
+                },
+            ],
+            'items_available': 2,
+        }
+        self.assertFalse(self.attempt())
+
+    def test_server_ignores_vm_filters(self):
+        self.response['virtual_machines'] = lambda: {
+            'items': [
+                {
+                    'uuid': 'zzzzz-2x53u-382brsig8rp3065',
+                    'hostname': 'testvm22.shell', # <-----
+                },
+            ],
+            'items_available': 1,
+        }
+        self.assertFalse(self.attempt())
+
+    def test_fail_user_lookup(self):
+        self.response['users'] = self._raise
+        self.assertFalse(self.attempt())
+
+    def test_fail_permission_check(self):
+        self.response['links'] = self._raise
+        self.assertFalse(self.attempt())
+
+    def test_no_login_permission(self):
+        self.response['links'] = lambda: {
+            'items': [],
+        }
+        self.assertFalse(self.attempt())
+
+    def test_server_ignores_permission_filters(self):
+        self.response['links'] = lambda: {
+            'items': [{
+                'uuid': 'zzzzz-o0j2j-rah2ya1ohx9xaev',
+                'tail_uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
+                'head_uuid': 'zzzzz-2x53u-382brsig8rp3065',
+                'link_class': 'permission',
+                'name': 'CANT_login', # <-----
+                'properties': {
+                    'username': 'active',
+                },
+            }],
+        }
+        self.assertFalse(self.attempt())
+
+    def setUp(self):
+        self.request = self.default_request.copy()
+        self.response = self.default_response.copy()
+        self.api_client = mock.MagicMock(name='api_client')
+        self.api_client.users().current().execute.side_effect = lambda: self.response['users']()
+        self.api_client.virtual_machines().list().execute.side_effect = lambda: self.response['virtual_machines']()
+        self.api_client.links().list().execute.side_effect = lambda: self.response['links']()
+        patcher = mock.patch('arvados.api')
+        self.api = patcher.start()
+        self.addCleanup(patcher.stop)
+        self.api.side_effect = [self.api_client]
+
+    def _raise(self, exception=Exception("Test-induced failure"), *args, **kwargs):
+        raise exception