Back out some undesired test changes refs #17817
[arvados.git] / sdk / python / tests / test_util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import os
6 import subprocess
7 import unittest
8
9 import arvados
10 import arvados.util
11
12 class MkdirDashPTest(unittest.TestCase):
13     def setUp(self):
14         try:
15             os.path.mkdir('./tmp')
16         except:
17             pass
18     def tearDown(self):
19         try:
20             os.unlink('./tmp/bar')
21             os.rmdir('./tmp/foo')
22             os.rmdir('./tmp')
23         except:
24             pass
25     def runTest(self):
26         arvados.util.mkdir_dash_p('./tmp/foo')
27         with open('./tmp/bar', 'wb') as f:
28             f.write(b'bar')
29         self.assertRaises(OSError, arvados.util.mkdir_dash_p, './tmp/bar')
30
31
32 class RunCommandTestCase(unittest.TestCase):
33     def test_success(self):
34         stdout, stderr = arvados.util.run_command(['echo', 'test'],
35                                                   stderr=subprocess.PIPE)
36         self.assertEqual("test\n".encode(), stdout)
37         self.assertEqual("".encode(), stderr)
38
39     def test_failure(self):
40         with self.assertRaises(arvados.errors.CommandFailedError):
41             arvados.util.run_command(['false'])
42
43 class KeysetTestHelper:
44     def __init__(self, expect):
45         self.n = 0
46         self.expect = expect
47
48     def fn(self, **kwargs):
49         if self.expect[self.n][0] != kwargs:
50             raise Exception("Didn't match %s != %s" % (self.expect[self.n][0], kwargs))
51         return self
52
53     def execute(self, num_retries):
54         self.n += 1
55         return self.expect[self.n-1][1]
56
57 class KeysetListAllTestCase(unittest.TestCase):
58     def test_empty(self):
59         ks = KeysetTestHelper([[
60             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
61             {"items": []}
62         ]])
63
64         ls = list(arvados.util.keyset_list_all(ks.fn))
65         self.assertEqual(ls, [])
66
67     def test_oneitem(self):
68         ks = KeysetTestHelper([[
69             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
70             {"items": [{"created_at": "1", "uuid": "1"}]}
71         ], [
72             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "1"], ["uuid", ">", "1"]]},
73             {"items": []}
74         ],[
75             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "1"]]},
76             {"items": []}
77         ]])
78
79         ls = list(arvados.util.keyset_list_all(ks.fn))
80         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}])
81
82     def test_onepage2(self):
83         ks = KeysetTestHelper([[
84             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
85             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
86         ], [
87             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]},
88             {"items": []}
89         ]])
90
91         ls = list(arvados.util.keyset_list_all(ks.fn))
92         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
93
94     def test_onepage3(self):
95         ks = KeysetTestHelper([[
96             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
97             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}]}
98         ], [
99             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "3"], ["uuid", "!=", "3"]]},
100             {"items": []}
101         ]])
102
103         ls = list(arvados.util.keyset_list_all(ks.fn))
104         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "3", "uuid": "3"}])
105
106
107     def test_twopage(self):
108         ks = KeysetTestHelper([[
109             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
110             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
111         ], [
112             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"]]},
113             {"items": [{"created_at": "3", "uuid": "3"}, {"created_at": "4", "uuid": "4"}]}
114         ], [
115             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "4"]]},
116             {"items": []}
117         ]])
118
119         ls = list(arvados.util.keyset_list_all(ks.fn))
120         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"},
121                               {"created_at": "2", "uuid": "2"},
122                               {"created_at": "3", "uuid": "3"},
123                               {"created_at": "4", "uuid": "4"}
124         ])
125
126     def test_repeated_key(self):
127         ks = KeysetTestHelper([[
128             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": []},
129             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "3"}]}
130         ], [
131             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "3"]]},
132             {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "2", "uuid": "4"}]}
133         ], [
134             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", "=", "2"], ["uuid", ">", "4"]]},
135             {"items": []}
136         ], [
137             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">", "2"]]},
138             {"items": [{"created_at": "3", "uuid": "5"}, {"created_at": "4", "uuid": "6"}]}
139         ], [
140             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "4"], ["uuid", "!=", "6"]]},
141             {"items": []}
142         ],
143         ])
144
145         ls = list(arvados.util.keyset_list_all(ks.fn))
146         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"},
147                               {"created_at": "2", "uuid": "2"},
148                               {"created_at": "2", "uuid": "3"},
149                               {"created_at": "2", "uuid": "4"},
150                               {"created_at": "3", "uuid": "5"},
151                               {"created_at": "4", "uuid": "6"}
152         ])
153
154     def test_onepage_withfilter(self):
155         ks = KeysetTestHelper([[
156             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["foo", ">", "bar"]]},
157             {"items": [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}]}
158         ], [
159             {"limit": 1000, "count": "none", "order": ["created_at asc", "uuid asc"], "filters": [["created_at", ">=", "2"], ["uuid", "!=", "2"], ["foo", ">", "bar"]]},
160             {"items": []}
161         ]])
162
163         ls = list(arvados.util.keyset_list_all(ks.fn, filters=[["foo", ">", "bar"]]))
164         self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
165
166
167     def test_onepage_desc(self):
168         ks = KeysetTestHelper([[
169             {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid asc"], "filters": []},
170             {"items": [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}]}
171         ], [
172             {"limit": 1000, "count": "none", "order": ["created_at desc", "uuid asc"], "filters": [["created_at", "<=", "1"], ["uuid", "!=", "1"]]},
173             {"items": []}
174         ]])
175
176         ls = list(arvados.util.keyset_list_all(ks.fn, ascending=False))
177         self.assertEqual(ls, [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}])