ebb8421d9e1d3dbf6b8fce1f3fca0953568de931
[arvados.git] / services / keepstore / volume_unix_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "regexp"
9         "sort"
10         "strings"
11         "syscall"
12         "testing"
13         "time"
14 )
15
16 func TempUnixVolume(t *testing.T, serialize bool, readonly bool) *UnixVolume {
17         d, err := ioutil.TempDir("", "volume_test")
18         if err != nil {
19                 t.Fatal(err)
20         }
21         return &UnixVolume{
22                 root:      d,
23                 serialize: serialize,
24                 readonly:  readonly,
25         }
26 }
27
28 func _teardown(v *UnixVolume) {
29         os.RemoveAll(v.root)
30 }
31
32 // _store writes a Keep block directly into a UnixVolume, bypassing
33 // the overhead and safeguards of Put(). Useful for storing bogus data
34 // and isolating unit tests from Put() behavior.
35 func _store(t *testing.T, vol *UnixVolume, filename string, block []byte) {
36         blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
37         if err := os.MkdirAll(blockdir, 0755); err != nil {
38                 t.Fatal(err)
39         }
40
41         blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
42         if f, err := os.Create(blockpath); err == nil {
43                 f.Write(block)
44                 f.Close()
45         } else {
46                 t.Fatal(err)
47         }
48 }
49
50 func TestGet(t *testing.T) {
51         v := TempUnixVolume(t, false, false)
52         defer _teardown(v)
53         _store(t, v, TEST_HASH, TEST_BLOCK)
54
55         buf, err := v.Get(TEST_HASH)
56         if err != nil {
57                 t.Error(err)
58         }
59         if bytes.Compare(buf, TEST_BLOCK) != 0 {
60                 t.Errorf("expected %s, got %s", string(TEST_BLOCK), string(buf))
61         }
62 }
63
64 func TestGetNotFound(t *testing.T) {
65         v := TempUnixVolume(t, false, false)
66         defer _teardown(v)
67         _store(t, v, TEST_HASH, TEST_BLOCK)
68
69         buf, err := v.Get(TEST_HASH_2)
70         switch {
71         case os.IsNotExist(err):
72                 break
73         case err == nil:
74                 t.Errorf("Read should have failed, returned %s", string(buf))
75         default:
76                 t.Errorf("Read expected ErrNotExist, got: %s", err)
77         }
78 }
79
80 func TestIndexTo(t *testing.T) {
81         v := TempUnixVolume(t, false, false)
82         defer _teardown(v)
83
84         _store(t, v, TEST_HASH, TEST_BLOCK)
85         _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
86         _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
87
88         buf := new(bytes.Buffer)
89         v.IndexTo("", buf)
90         index_rows := strings.Split(string(buf.Bytes()), "\n")
91         sort.Strings(index_rows)
92         sorted_index := strings.Join(index_rows, "\n")
93         m, err := regexp.MatchString(
94                 `^\n`+TEST_HASH+`\+\d+ \d+\n`+
95                         TEST_HASH_3+`\+\d+ \d+\n`+
96                         TEST_HASH_2+`\+\d+ \d+$`,
97                 sorted_index)
98         if err != nil {
99                 t.Error(err)
100         } else if !m {
101                 t.Errorf("Got index %q for empty prefix", sorted_index)
102         }
103
104         for _, prefix := range []string{"f", "f15", "f15ac"} {
105                 buf = new(bytes.Buffer)
106                 v.IndexTo(prefix, buf)
107                 m, err := regexp.MatchString(`^`+TEST_HASH_2+`\+\d+ \d+\n$`, string(buf.Bytes()))
108                 if err != nil {
109                         t.Error(err)
110                 } else if !m {
111                         t.Errorf("Got index %q for prefix %q", string(buf.Bytes()), prefix)
112                 }
113         }
114 }
115
116 func TestPut(t *testing.T) {
117         v := TempUnixVolume(t, false, false)
118         defer _teardown(v)
119
120         err := v.Put(TEST_HASH, TEST_BLOCK)
121         if err != nil {
122                 t.Error(err)
123         }
124         p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
125         if buf, err := ioutil.ReadFile(p); err != nil {
126                 t.Error(err)
127         } else if bytes.Compare(buf, TEST_BLOCK) != 0 {
128                 t.Errorf("Write should have stored %s, did store %s",
129                         string(TEST_BLOCK), string(buf))
130         }
131 }
132
133 func TestPutBadVolume(t *testing.T) {
134         v := TempUnixVolume(t, false, false)
135         defer _teardown(v)
136
137         os.Chmod(v.root, 000)
138         err := v.Put(TEST_HASH, TEST_BLOCK)
139         if err == nil {
140                 t.Error("Write should have failed")
141         }
142 }
143
144 func TestUnixVolumeReadonly(t *testing.T) {
145         v := TempUnixVolume(t, false, false)
146         defer _teardown(v)
147
148         // First write something before marking readonly
149         err := v.Put(TEST_HASH, TEST_BLOCK)
150         if err != nil {
151                 t.Error("got err %v, expected nil", err)
152         }
153
154         v.readonly = true
155
156         _, err = v.Get(TEST_HASH)
157         if err != nil {
158                 t.Error("got err %v, expected nil", err)
159         }
160
161         err = v.Put(TEST_HASH, TEST_BLOCK)
162         if err != MethodDisabledError {
163                 t.Error("got err %v, expected MethodDisabledError", err)
164         }
165
166         err = v.Touch(TEST_HASH)
167         if err != MethodDisabledError {
168                 t.Error("got err %v, expected MethodDisabledError", err)
169         }
170
171         err = v.Delete(TEST_HASH)
172         if err != MethodDisabledError {
173                 t.Error("got err %v, expected MethodDisabledError", err)
174         }
175 }
176
177 // TestPutTouch
178 //     Test that when applying PUT to a block that already exists,
179 //     the block's modification time is updated.
180 func TestPutTouch(t *testing.T) {
181         v := TempUnixVolume(t, false, false)
182         defer _teardown(v)
183
184         if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
185                 t.Error(err)
186         }
187
188         // We'll verify { t0 < threshold < t1 }, where t0 is the
189         // existing block's timestamp on disk before Put() and t1 is
190         // its timestamp after Put().
191         threshold := time.Now().Add(-time.Second)
192
193         // Set the stored block's mtime far enough in the past that we
194         // can see the difference between "timestamp didn't change"
195         // and "timestamp granularity is too low".
196         {
197                 oldtime := time.Now().Add(-20 * time.Second).Unix()
198                 if err := syscall.Utime(v.blockPath(TEST_HASH),
199                         &syscall.Utimbuf{oldtime, oldtime}); err != nil {
200                         t.Error(err)
201                 }
202
203                 // Make sure v.Mtime() agrees the above Utime really worked.
204                 if t0, err := v.Mtime(TEST_HASH); err != nil || t0.IsZero() || !t0.Before(threshold) {
205                         t.Errorf("Setting mtime failed: %v, %v", t0, err)
206                 }
207         }
208
209         // Write the same block again.
210         if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
211                 t.Error(err)
212         }
213
214         // Verify threshold < t1
215         t1, err := v.Mtime(TEST_HASH)
216         if err != nil {
217                 t.Error(err)
218         }
219         if t1.Before(threshold) {
220                 t.Errorf("t1 %v must be >= threshold %v after v.Put ",
221                         t1, threshold)
222         }
223 }
224
225 // Serialization tests: launch a bunch of concurrent
226 //
227 // TODO(twp): show that the underlying Read/Write operations executed
228 // serially and not concurrently. The easiest way to do this is
229 // probably to activate verbose or debug logging, capture log output
230 // and examine it to confirm that Reads and Writes did not overlap.
231 //
232 // TODO(twp): a proper test of I/O serialization requires that a
233 // second request start while the first one is still underway.
234 // Guaranteeing that the test behaves this way requires some tricky
235 // synchronization and mocking.  For now we'll just launch a bunch of
236 // requests simultaenously in goroutines and demonstrate that they
237 // return accurate results.
238 //
239 func TestGetSerialized(t *testing.T) {
240         // Create a volume with I/O serialization enabled.
241         v := TempUnixVolume(t, true, false)
242         defer _teardown(v)
243
244         _store(t, v, TEST_HASH, TEST_BLOCK)
245         _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
246         _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
247
248         sem := make(chan int)
249         go func(sem chan int) {
250                 buf, err := v.Get(TEST_HASH)
251                 if err != nil {
252                         t.Errorf("err1: %v", err)
253                 }
254                 if bytes.Compare(buf, TEST_BLOCK) != 0 {
255                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK), string(buf))
256                 }
257                 sem <- 1
258         }(sem)
259
260         go func(sem chan int) {
261                 buf, err := v.Get(TEST_HASH_2)
262                 if err != nil {
263                         t.Errorf("err2: %v", err)
264                 }
265                 if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
266                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_2), string(buf))
267                 }
268                 sem <- 1
269         }(sem)
270
271         go func(sem chan int) {
272                 buf, err := v.Get(TEST_HASH_3)
273                 if err != nil {
274                         t.Errorf("err3: %v", err)
275                 }
276                 if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
277                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_3), string(buf))
278                 }
279                 sem <- 1
280         }(sem)
281
282         // Wait for all goroutines to finish
283         for done := 0; done < 3; {
284                 done += <-sem
285         }
286 }
287
288 func TestPutSerialized(t *testing.T) {
289         // Create a volume with I/O serialization enabled.
290         v := TempUnixVolume(t, true, false)
291         defer _teardown(v)
292
293         sem := make(chan int)
294         go func(sem chan int) {
295                 err := v.Put(TEST_HASH, TEST_BLOCK)
296                 if err != nil {
297                         t.Errorf("err1: %v", err)
298                 }
299                 sem <- 1
300         }(sem)
301
302         go func(sem chan int) {
303                 err := v.Put(TEST_HASH_2, TEST_BLOCK_2)
304                 if err != nil {
305                         t.Errorf("err2: %v", err)
306                 }
307                 sem <- 1
308         }(sem)
309
310         go func(sem chan int) {
311                 err := v.Put(TEST_HASH_3, TEST_BLOCK_3)
312                 if err != nil {
313                         t.Errorf("err3: %v", err)
314                 }
315                 sem <- 1
316         }(sem)
317
318         // Wait for all goroutines to finish
319         for done := 0; done < 3; {
320                 done += <-sem
321         }
322
323         // Double check that we actually wrote the blocks we expected to write.
324         buf, err := v.Get(TEST_HASH)
325         if err != nil {
326                 t.Errorf("Get #1: %v", err)
327         }
328         if bytes.Compare(buf, TEST_BLOCK) != 0 {
329                 t.Errorf("Get #1: expected %s, got %s", string(TEST_BLOCK), string(buf))
330         }
331
332         buf, err = v.Get(TEST_HASH_2)
333         if err != nil {
334                 t.Errorf("Get #2: %v", err)
335         }
336         if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
337                 t.Errorf("Get #2: expected %s, got %s", string(TEST_BLOCK_2), string(buf))
338         }
339
340         buf, err = v.Get(TEST_HASH_3)
341         if err != nil {
342                 t.Errorf("Get #3: %v", err)
343         }
344         if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
345                 t.Errorf("Get #3: expected %s, got %s", string(TEST_BLOCK_3), string(buf))
346         }
347 }
348
349 func TestIsFull(t *testing.T) {
350         v := TempUnixVolume(t, false, false)
351         defer _teardown(v)
352
353         full_path := v.root + "/full"
354         now := fmt.Sprintf("%d", time.Now().Unix())
355         os.Symlink(now, full_path)
356         if !v.IsFull() {
357                 t.Errorf("%s: claims not to be full", v)
358         }
359         os.Remove(full_path)
360
361         // Test with an expired /full link.
362         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
363         os.Symlink(expired, full_path)
364         if v.IsFull() {
365                 t.Errorf("%s: should no longer be full", v)
366         }
367 }
368
369 func TestNodeStatus(t *testing.T) {
370         v := TempUnixVolume(t, false, false)
371         defer _teardown(v)
372
373         // Get node status and make a basic sanity check.
374         volinfo := v.Status()
375         if volinfo.MountPoint != v.root {
376                 t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.root)
377         }
378         if volinfo.DeviceNum == 0 {
379                 t.Errorf("uninitialized device_num in %v", volinfo)
380         }
381         if volinfo.BytesFree == 0 {
382                 t.Errorf("uninitialized bytes_free in %v", volinfo)
383         }
384         if volinfo.BytesUsed == 0 {
385                 t.Errorf("uninitialized bytes_used in %v", volinfo)
386         }
387 }