5562: Use static method. Fixes "TypeError: _socket_open() takes exactly 5 arguments...
[arvados.git] / services / keepstore / volume_unix_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "syscall"
9         "testing"
10         "time"
11 )
12
13 func TempUnixVolume(t *testing.T, serialize bool, readonly bool) *UnixVolume {
14         d, err := ioutil.TempDir("", "volume_test")
15         if err != nil {
16                 t.Fatal(err)
17         }
18         return MakeUnixVolume(d, serialize, readonly)
19 }
20
21 func _teardown(v *UnixVolume) {
22         if v.queue != nil {
23                 close(v.queue)
24         }
25         os.RemoveAll(v.root)
26 }
27
28 // store writes a Keep block directly into a UnixVolume, for testing
29 // UnixVolume methods.
30 //
31 func _store(t *testing.T, vol *UnixVolume, filename string, block []byte) {
32         blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
33         if err := os.MkdirAll(blockdir, 0755); err != nil {
34                 t.Fatal(err)
35         }
36
37         blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
38         if f, err := os.Create(blockpath); err == nil {
39                 f.Write(block)
40                 f.Close()
41         } else {
42                 t.Fatal(err)
43         }
44 }
45
46 func TestGet(t *testing.T) {
47         v := TempUnixVolume(t, false, false)
48         defer _teardown(v)
49         _store(t, v, TEST_HASH, TEST_BLOCK)
50
51         buf, err := v.Get(TEST_HASH)
52         if err != nil {
53                 t.Error(err)
54         }
55         if bytes.Compare(buf, TEST_BLOCK) != 0 {
56                 t.Errorf("expected %s, got %s", string(TEST_BLOCK), string(buf))
57         }
58 }
59
60 func TestGetNotFound(t *testing.T) {
61         v := TempUnixVolume(t, false, false)
62         defer _teardown(v)
63         _store(t, v, TEST_HASH, TEST_BLOCK)
64
65         buf, err := v.Get(TEST_HASH_2)
66         switch {
67         case os.IsNotExist(err):
68                 break
69         case err == nil:
70                 t.Errorf("Read should have failed, returned %s", string(buf))
71         default:
72                 t.Errorf("Read expected ErrNotExist, got: %s", err)
73         }
74 }
75
76 func TestPut(t *testing.T) {
77         v := TempUnixVolume(t, false, false)
78         defer _teardown(v)
79
80         err := v.Put(TEST_HASH, TEST_BLOCK)
81         if err != nil {
82                 t.Error(err)
83         }
84         p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
85         if buf, err := ioutil.ReadFile(p); err != nil {
86                 t.Error(err)
87         } else if bytes.Compare(buf, TEST_BLOCK) != 0 {
88                 t.Errorf("Write should have stored %s, did store %s",
89                         string(TEST_BLOCK), string(buf))
90         }
91 }
92
93 func TestPutBadVolume(t *testing.T) {
94         v := TempUnixVolume(t, false, false)
95         defer _teardown(v)
96
97         os.Chmod(v.root, 000)
98         err := v.Put(TEST_HASH, TEST_BLOCK)
99         if err == nil {
100                 t.Error("Write should have failed")
101         }
102 }
103
104 func TestUnixVolumeReadonly(t *testing.T) {
105         v := TempUnixVolume(t, false, false)
106         defer _teardown(v)
107
108         // First write something before marking readonly
109         err := v.Put(TEST_HASH, TEST_BLOCK)
110         if err != nil {
111                 t.Error("got err %v, expected nil", err)
112         }
113
114         v.readonly = true
115
116         _, err = v.Get(TEST_HASH)
117         if err != nil {
118                 t.Error("got err %v, expected nil", err)
119         }
120
121         err = v.Put(TEST_HASH, TEST_BLOCK)
122         if err != MethodDisabledError {
123                 t.Error("got err %v, expected MethodDisabledError", err)
124         }
125
126         err = v.Touch(TEST_HASH)
127         if err != MethodDisabledError {
128                 t.Error("got err %v, expected MethodDisabledError", err)
129         }
130
131         err = v.Delete(TEST_HASH)
132         if err != MethodDisabledError {
133                 t.Error("got err %v, expected MethodDisabledError", err)
134         }
135 }
136
137 // TestPutTouch
138 //     Test that when applying PUT to a block that already exists,
139 //     the block's modification time is updated.
140 func TestPutTouch(t *testing.T) {
141         v := TempUnixVolume(t, false, false)
142         defer _teardown(v)
143
144         if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
145                 t.Error(err)
146         }
147
148         // We'll verify { t0 < threshold < t1 }, where t0 is the
149         // existing block's timestamp on disk before Put() and t1 is
150         // its timestamp after Put().
151         threshold := time.Now().Add(-time.Second)
152
153         // Set the stored block's mtime far enough in the past that we
154         // can see the difference between "timestamp didn't change"
155         // and "timestamp granularity is too low".
156         {
157                 oldtime := time.Now().Add(-20 * time.Second).Unix()
158                 if err := syscall.Utime(v.blockPath(TEST_HASH),
159                         &syscall.Utimbuf{oldtime, oldtime}); err != nil {
160                         t.Error(err)
161                 }
162
163                 // Make sure v.Mtime() agrees the above Utime really worked.
164                 if t0, err := v.Mtime(TEST_HASH); err != nil || t0.IsZero() || !t0.Before(threshold) {
165                         t.Errorf("Setting mtime failed: %v, %v", t0, err)
166                 }
167         }
168
169         // Write the same block again.
170         if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
171                 t.Error(err)
172         }
173
174         // Verify threshold < t1
175         t1, err := v.Mtime(TEST_HASH)
176         if err != nil {
177                 t.Error(err)
178         }
179         if t1.Before(threshold) {
180                 t.Errorf("t1 %v must be >= threshold %v after v.Put ",
181                         t1, threshold)
182         }
183 }
184
185 // Serialization tests: launch a bunch of concurrent
186 //
187 // TODO(twp): show that the underlying Read/Write operations executed
188 // serially and not concurrently. The easiest way to do this is
189 // probably to activate verbose or debug logging, capture log output
190 // and examine it to confirm that Reads and Writes did not overlap.
191 //
192 // TODO(twp): a proper test of I/O serialization requires that a
193 // second request start while the first one is still underway.
194 // Guaranteeing that the test behaves this way requires some tricky
195 // synchronization and mocking.  For now we'll just launch a bunch of
196 // requests simultaenously in goroutines and demonstrate that they
197 // return accurate results.
198 //
199 func TestGetSerialized(t *testing.T) {
200         // Create a volume with I/O serialization enabled.
201         v := TempUnixVolume(t, true, false)
202         defer _teardown(v)
203
204         _store(t, v, TEST_HASH, TEST_BLOCK)
205         _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
206         _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
207
208         sem := make(chan int)
209         go func(sem chan int) {
210                 buf, err := v.Get(TEST_HASH)
211                 if err != nil {
212                         t.Errorf("err1: %v", err)
213                 }
214                 if bytes.Compare(buf, TEST_BLOCK) != 0 {
215                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK), string(buf))
216                 }
217                 sem <- 1
218         }(sem)
219
220         go func(sem chan int) {
221                 buf, err := v.Get(TEST_HASH_2)
222                 if err != nil {
223                         t.Errorf("err2: %v", err)
224                 }
225                 if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
226                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_2), string(buf))
227                 }
228                 sem <- 1
229         }(sem)
230
231         go func(sem chan int) {
232                 buf, err := v.Get(TEST_HASH_3)
233                 if err != nil {
234                         t.Errorf("err3: %v", err)
235                 }
236                 if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
237                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_3), string(buf))
238                 }
239                 sem <- 1
240         }(sem)
241
242         // Wait for all goroutines to finish
243         for done := 0; done < 3; {
244                 done += <-sem
245         }
246 }
247
248 func TestPutSerialized(t *testing.T) {
249         // Create a volume with I/O serialization enabled.
250         v := TempUnixVolume(t, true, false)
251         defer _teardown(v)
252
253         sem := make(chan int)
254         go func(sem chan int) {
255                 err := v.Put(TEST_HASH, TEST_BLOCK)
256                 if err != nil {
257                         t.Errorf("err1: %v", err)
258                 }
259                 sem <- 1
260         }(sem)
261
262         go func(sem chan int) {
263                 err := v.Put(TEST_HASH_2, TEST_BLOCK_2)
264                 if err != nil {
265                         t.Errorf("err2: %v", err)
266                 }
267                 sem <- 1
268         }(sem)
269
270         go func(sem chan int) {
271                 err := v.Put(TEST_HASH_3, TEST_BLOCK_3)
272                 if err != nil {
273                         t.Errorf("err3: %v", err)
274                 }
275                 sem <- 1
276         }(sem)
277
278         // Wait for all goroutines to finish
279         for done := 0; done < 2; {
280                 done += <-sem
281         }
282
283         // Double check that we actually wrote the blocks we expected to write.
284         buf, err := v.Get(TEST_HASH)
285         if err != nil {
286                 t.Errorf("Get #1: %v", err)
287         }
288         if bytes.Compare(buf, TEST_BLOCK) != 0 {
289                 t.Errorf("Get #1: expected %s, got %s", string(TEST_BLOCK), string(buf))
290         }
291
292         buf, err = v.Get(TEST_HASH_2)
293         if err != nil {
294                 t.Errorf("Get #2: %v", err)
295         }
296         if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
297                 t.Errorf("Get #2: expected %s, got %s", string(TEST_BLOCK_2), string(buf))
298         }
299
300         buf, err = v.Get(TEST_HASH_3)
301         if err != nil {
302                 t.Errorf("Get #3: %v", err)
303         }
304         if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
305                 t.Errorf("Get #3: expected %s, got %s", string(TEST_BLOCK_3), string(buf))
306         }
307 }
308
309 func TestIsFull(t *testing.T) {
310         v := TempUnixVolume(t, false, false)
311         defer _teardown(v)
312
313         full_path := v.root + "/full"
314         now := fmt.Sprintf("%d", time.Now().Unix())
315         os.Symlink(now, full_path)
316         if !v.IsFull() {
317                 t.Errorf("%s: claims not to be full", v)
318         }
319         os.Remove(full_path)
320
321         // Test with an expired /full link.
322         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
323         os.Symlink(expired, full_path)
324         if v.IsFull() {
325                 t.Errorf("%s: should no longer be full", v)
326         }
327 }