Merge branch 'master' into 3618-column-ordering
[arvados.git] / services / keepstore / volume_unix_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "syscall"
9         "testing"
10         "time"
11 )
12
13 func TempUnixVolume(t *testing.T, serialize bool) UnixVolume {
14         d, err := ioutil.TempDir("", "volume_test")
15         if err != nil {
16                 t.Fatal(err)
17         }
18         return MakeUnixVolume(d, serialize)
19 }
20
21 func _teardown(v UnixVolume) {
22         if v.queue != nil {
23                 close(v.queue)
24         }
25         os.RemoveAll(v.root)
26 }
27
28 // store writes a Keep block directly into a UnixVolume, for testing
29 // UnixVolume methods.
30 //
31 func _store(t *testing.T, vol UnixVolume, filename string, block []byte) {
32         blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
33         if err := os.MkdirAll(blockdir, 0755); err != nil {
34                 t.Fatal(err)
35         }
36
37         blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
38         if f, err := os.Create(blockpath); err == nil {
39                 f.Write(block)
40                 f.Close()
41         } else {
42                 t.Fatal(err)
43         }
44 }
45
46 func TestGet(t *testing.T) {
47         v := TempUnixVolume(t, false)
48         defer _teardown(v)
49         _store(t, v, TEST_HASH, TEST_BLOCK)
50
51         buf, err := v.Get(TEST_HASH)
52         if err != nil {
53                 t.Error(err)
54         }
55         if bytes.Compare(buf, TEST_BLOCK) != 0 {
56                 t.Errorf("expected %s, got %s", string(TEST_BLOCK), string(buf))
57         }
58 }
59
60 func TestGetNotFound(t *testing.T) {
61         v := TempUnixVolume(t, false)
62         defer _teardown(v)
63         _store(t, v, TEST_HASH, TEST_BLOCK)
64
65         buf, err := v.Get(TEST_HASH_2)
66         switch {
67         case os.IsNotExist(err):
68                 break
69         case err == nil:
70                 t.Errorf("Read should have failed, returned %s", string(buf))
71         default:
72                 t.Errorf("Read expected ErrNotExist, got: %s", err)
73         }
74 }
75
76 func TestPut(t *testing.T) {
77         v := TempUnixVolume(t, false)
78         defer _teardown(v)
79
80         err := v.Put(TEST_HASH, TEST_BLOCK)
81         if err != nil {
82                 t.Error(err)
83         }
84         p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
85         if buf, err := ioutil.ReadFile(p); err != nil {
86                 t.Error(err)
87         } else if bytes.Compare(buf, TEST_BLOCK) != 0 {
88                 t.Errorf("Write should have stored %s, did store %s",
89                         string(TEST_BLOCK), string(buf))
90         }
91 }
92
93 func TestPutBadVolume(t *testing.T) {
94         v := TempUnixVolume(t, false)
95         defer _teardown(v)
96
97         os.Chmod(v.root, 000)
98         err := v.Put(TEST_HASH, TEST_BLOCK)
99         if err == nil {
100                 t.Error("Write should have failed")
101         }
102 }
103
104 // TestPutTouch
105 //     Test that when applying PUT to a block that already exists,
106 //     the block's modification time is updated.
107 func TestPutTouch(t *testing.T) {
108         v := TempUnixVolume(t, false)
109         defer _teardown(v)
110
111         if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
112                 t.Error(err)
113         }
114
115         // We'll verify { t0 < threshold < t1 }, where t0 is the
116         // existing block's timestamp on disk before Put() and t1 is
117         // its timestamp after Put().
118         threshold := time.Now().Add(-time.Second)
119
120         // Set the stored block's mtime far enough in the past that we
121         // can see the difference between "timestamp didn't change"
122         // and "timestamp granularity is too low".
123         {
124                 oldtime := time.Now().Add(-20 * time.Second).Unix()
125                 if err := syscall.Utime(v.blockPath(TEST_HASH),
126                         &syscall.Utimbuf{oldtime, oldtime}); err != nil {
127                         t.Error(err)
128                 }
129
130                 // Make sure v.Mtime() agrees the above Utime really worked.
131                 if t0, err := v.Mtime(TEST_HASH); err != nil || t0.IsZero() || !t0.Before(threshold) {
132                         t.Errorf("Setting mtime failed: %v, %v", t0, err)
133                 }
134         }
135
136         // Write the same block again.
137         if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
138                 t.Error(err)
139         }
140
141         // Verify threshold < t1
142         t1, err := v.Mtime(TEST_HASH)
143         if err != nil {
144                 t.Error(err)
145         }
146         if t1.Before(threshold) {
147                 t.Errorf("t1 %v must be >= threshold %v after v.Put ",
148                         t1, threshold)
149         }
150 }
151
152 // Serialization tests: launch a bunch of concurrent
153 //
154 // TODO(twp): show that the underlying Read/Write operations executed
155 // serially and not concurrently. The easiest way to do this is
156 // probably to activate verbose or debug logging, capture log output
157 // and examine it to confirm that Reads and Writes did not overlap.
158 //
159 // TODO(twp): a proper test of I/O serialization requires that a
160 // second request start while the first one is still underway.
161 // Guaranteeing that the test behaves this way requires some tricky
162 // synchronization and mocking.  For now we'll just launch a bunch of
163 // requests simultaenously in goroutines and demonstrate that they
164 // return accurate results.
165 //
166 func TestGetSerialized(t *testing.T) {
167         // Create a volume with I/O serialization enabled.
168         v := TempUnixVolume(t, true)
169         defer _teardown(v)
170
171         _store(t, v, TEST_HASH, TEST_BLOCK)
172         _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
173         _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
174
175         sem := make(chan int)
176         go func(sem chan int) {
177                 buf, err := v.Get(TEST_HASH)
178                 if err != nil {
179                         t.Errorf("err1: %v", err)
180                 }
181                 if bytes.Compare(buf, TEST_BLOCK) != 0 {
182                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK), string(buf))
183                 }
184                 sem <- 1
185         }(sem)
186
187         go func(sem chan int) {
188                 buf, err := v.Get(TEST_HASH_2)
189                 if err != nil {
190                         t.Errorf("err2: %v", err)
191                 }
192                 if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
193                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_2), string(buf))
194                 }
195                 sem <- 1
196         }(sem)
197
198         go func(sem chan int) {
199                 buf, err := v.Get(TEST_HASH_3)
200                 if err != nil {
201                         t.Errorf("err3: %v", err)
202                 }
203                 if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
204                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_3), string(buf))
205                 }
206                 sem <- 1
207         }(sem)
208
209         // Wait for all goroutines to finish
210         for done := 0; done < 3; {
211                 done += <-sem
212         }
213 }
214
215 func TestPutSerialized(t *testing.T) {
216         // Create a volume with I/O serialization enabled.
217         v := TempUnixVolume(t, true)
218         defer _teardown(v)
219
220         sem := make(chan int)
221         go func(sem chan int) {
222                 err := v.Put(TEST_HASH, TEST_BLOCK)
223                 if err != nil {
224                         t.Errorf("err1: %v", err)
225                 }
226                 sem <- 1
227         }(sem)
228
229         go func(sem chan int) {
230                 err := v.Put(TEST_HASH_2, TEST_BLOCK_2)
231                 if err != nil {
232                         t.Errorf("err2: %v", err)
233                 }
234                 sem <- 1
235         }(sem)
236
237         go func(sem chan int) {
238                 err := v.Put(TEST_HASH_3, TEST_BLOCK_3)
239                 if err != nil {
240                         t.Errorf("err3: %v", err)
241                 }
242                 sem <- 1
243         }(sem)
244
245         // Wait for all goroutines to finish
246         for done := 0; done < 2; {
247                 done += <-sem
248         }
249
250         // Double check that we actually wrote the blocks we expected to write.
251         buf, err := v.Get(TEST_HASH)
252         if err != nil {
253                 t.Errorf("Get #1: %v", err)
254         }
255         if bytes.Compare(buf, TEST_BLOCK) != 0 {
256                 t.Errorf("Get #1: expected %s, got %s", string(TEST_BLOCK), string(buf))
257         }
258
259         buf, err = v.Get(TEST_HASH_2)
260         if err != nil {
261                 t.Errorf("Get #2: %v", err)
262         }
263         if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
264                 t.Errorf("Get #2: expected %s, got %s", string(TEST_BLOCK_2), string(buf))
265         }
266
267         buf, err = v.Get(TEST_HASH_3)
268         if err != nil {
269                 t.Errorf("Get #3: %v", err)
270         }
271         if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
272                 t.Errorf("Get #3: expected %s, got %s", string(TEST_BLOCK_3), string(buf))
273         }
274 }
275
276 func TestIsFull(t *testing.T) {
277         v := TempUnixVolume(t, false)
278         defer _teardown(v)
279
280         full_path := v.root + "/full"
281         now := fmt.Sprintf("%d", time.Now().Unix())
282         os.Symlink(now, full_path)
283         if !v.IsFull() {
284                 t.Errorf("%s: claims not to be full", v)
285         }
286         os.Remove(full_path)
287
288         // Test with an expired /full link.
289         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
290         os.Symlink(expired, full_path)
291         if v.IsFull() {
292                 t.Errorf("%s: should no longer be full", v)
293         }
294 }